You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/12/03 15:57:37 UTC

[GitHub] [hive] maheshk114 opened a new pull request #1736: HIVE-24471 : Add support for combiner in hash mode group aggregation

maheshk114 opened a new pull request #1736:
URL: https://github.com/apache/hive/pull/1736


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://cwiki.apache.org/confluence/display/Hive/HowToContribute
     2. Ensure that you have created an issue on the Hive project JIRA: https://issues.apache.org/jira/projects/HIVE/summary
     3. Ensure you have added or run the appropriate tests for your PR: 
     4. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]HIVE-XXXXX:  Your PR title ...'.
     5. Be sure to keep the PR description updated to reflect all changes.
     6. Please write your PR title to summarize what this PR proposes.
     7. If possible, provide a concise example to reproduce the issue for a faster review.
   
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description, screenshot and/or a reproducable example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Hive versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] maheshk114 commented on a change in pull request #1736: HIVE-24471 : Add support for combiner in hash mode group aggregation

Posted by GitBox <gi...@apache.org>.
maheshk114 commented on a change in pull request #1736:
URL: https://github.com/apache/hive/pull/1736#discussion_r539842047



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByCombiner.java
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.combine.MRCombiner;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_REDUCER_CLASS;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+import static org.apache.hadoop.hive.serde2.lazy.fast.LazySimpleDeserializeRead.byteArrayCompareRanges;
+
+// Combiner for vectorized group by operator. In case of map side aggregate, the partially
+// aggregated records are sorted based on group by key. If because of some reasons, like hash
+// table memory exceeded the limit or the first few batches of records have less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as the records
+// are sorted based on group by key.
+public class VectorGroupByCombiner extends MRCombiner {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      VectorGroupByCombiner.class.getName());
+  protected final Configuration conf;
+  protected final TezCounter combineInputRecordsCounter;
+  protected final TezCounter combineOutputRecordsCounter;
+  VectorAggregateExpression[] aggregators;
+  VectorAggregationBufferRow aggregationBufferRow;
+  protected transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite;
+
+  // This helper object serializes LazyBinary format reducer values from columns of a row
+  // in a vectorized row batch.
+  protected transient VectorSerializeRow<LazyBinarySerializeWrite> valueVectorSerializeRow;
+
+  // The output buffer used to serialize a value into.
+  protected transient ByteStream.Output valueOutput;
+  DataInputBuffer valueBytesWritable;
+
+  // Only required minimal configs are copied to the worker nodes. This hack (file.) is
+  // done to include these configs to be copied to the worker node.
+  protected static String confPrefixForWorker = "file.";
+
+  VectorDeserializeRow<LazyBinaryDeserializeRead> batchValueDeserializer;
+  int firstValueColumnOffset;
+  VectorizedRowBatchCtx batchContext = null;
+  int numValueCol = 0;
+  protected ReduceWork rw;
+  VectorizedRowBatch outputBatch = null;
+  VectorizedRowBatch inputBatch = null;
+  protected Deserializer inputKeyDeserializer = null;
+  protected ObjectInspector keyObjectInspector = null;
+  protected ObjectInspector valueObjectInspector = null;
+  protected StructObjectInspector valueStructInspectors = null;
+  protected StructObjectInspector keyStructInspector = null;
+
+  public VectorGroupByCombiner(TaskContext taskContext) throws HiveException, IOException {
+    super(taskContext);
+
+    combineInputRecordsCounter =
+            taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+    combineOutputRecordsCounter =
+            taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
+
+    conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload());
+    rw = getReduceWork();
+    if (rw == null) {
+      return;
+    }
+
+    if (rw.getReducer() instanceof VectorGroupByOperator) {
+      VectorGroupByOperator vectorGroupByOperator = (VectorGroupByOperator) rw.getReducer();
+      vectorGroupByOperator.initializeOp(this.conf);
+      this.aggregators = vectorGroupByOperator.getAggregators();
+      this.aggregationBufferRow = allocateAggregationBuffer();
+      batchContext = rw.getVectorizedRowBatchCtx();
+    }
+
+    try {
+      initObjectInspectors(rw.getTagToValueDesc().get(0), rw.getKeyDesc());
+      if (batchContext != null && numValueCol > 0) {
+        initVectorBatches();
+      }
+    } catch (SerDeException e) {
+      LOG.error("Fail to initialize VectorGroupByCombiner.", e);
+      throw new RuntimeException(e.getCause());
+    }
+  }
+
+  // Get the reduce work from the config. Here some hack is used to prefix the config name with
+  // "file." to avoid the config being filtered out.
+  private ReduceWork getReduceWork() {
+    String plan =  conf.get(confPrefixForWorker + HiveConf.ConfVars.PLAN.varname);
+    this.conf.set(HiveConf.ConfVars.PLAN.varname, plan);
+    if (conf.getBoolean(confPrefixForWorker + HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname,
+            true)) {
+      Path planPath = new Path(plan);
+      planPath = new Path(planPath, REDUCE_PLAN_NAME);
+      String planString = conf.get(confPrefixForWorker + planPath.toUri().getPath());
+      this.conf.set(planPath.toUri().getPath(), planString);
+      this.conf.set(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname, "true");
+    } else {
+      this.conf.set(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname, "false");
+    }
+    this.conf.set(HAS_REDUCE_WORK, "true");
+    this.conf.set(MAPRED_REDUCER_CLASS, ExecReducer.class.getName());
+
+    return Utilities.getReduceWork(conf);
+  }
+
+  private void initObjectInspectors(TableDesc valueTableDesc,TableDesc keyTableDesc)
+          throws SerDeException {
+    inputKeyDeserializer =
+            ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), null);
+    SerDeUtils.initializeSerDe(inputKeyDeserializer, null,
+            keyTableDesc.getProperties(), null);
+    keyObjectInspector = inputKeyDeserializer.getObjectInspector();
+
+    keyStructInspector = (StructObjectInspector) keyObjectInspector;
+    firstValueColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
+
+    Deserializer inputValueDeserializer = (AbstractSerDe) ReflectionUtils.newInstance(
+            valueTableDesc.getDeserializerClass(), null);
+    SerDeUtils.initializeSerDe(inputValueDeserializer, null,
+            valueTableDesc.getProperties(), null);
+    valueObjectInspector = inputValueDeserializer.getObjectInspector();
+    valueStructInspectors = (StructObjectInspector) valueObjectInspector;
+    numValueCol = valueStructInspectors.getAllStructFieldRefs().size();
+  }
+
+  void initVectorBatches() throws HiveException {
+    inputBatch = batchContext.createVectorizedRowBatch();
+
+    // Create data buffers for value bytes column vectors.
+    for (int i = firstValueColumnOffset; i < inputBatch.numCols; i++) {
+      ColumnVector colVector = inputBatch.cols[i];
+      if (colVector instanceof BytesColumnVector) {
+        BytesColumnVector bytesColumnVector = (BytesColumnVector) colVector;
+        bytesColumnVector.initBuffer();
+      }
+    }
+
+    batchValueDeserializer =
+            new VectorDeserializeRow<>(
+                    new LazyBinaryDeserializeRead(
+                            VectorizedBatchUtil.typeInfosFromStructObjectInspector(
+                                    valueStructInspectors),
+                            true));
+    batchValueDeserializer.init(firstValueColumnOffset);
+
+    int[] valueColumnMap = new int[numValueCol];
+    for (int i = 0; i < numValueCol; i++) {
+      valueColumnMap[i] = i + firstValueColumnOffset;
+    }
+
+    valueLazyBinarySerializeWrite = new LazyBinarySerializeWrite(numValueCol);
+    valueVectorSerializeRow = new VectorSerializeRow<>(valueLazyBinarySerializeWrite);
+    valueVectorSerializeRow.init(VectorizedBatchUtil.typeInfosFromStructObjectInspector(
+            valueStructInspectors), valueColumnMap);
+    valueOutput = new ByteStream.Output();
+    valueVectorSerializeRow.setOutput(valueOutput);
+    outputBatch = batchContext.createVectorizedRowBatch();
+    valueBytesWritable = new DataInputBuffer();
+  }
+
+  private VectorAggregationBufferRow allocateAggregationBuffer() throws HiveException {
+    VectorAggregateExpression.AggregationBuffer[] aggregationBuffers =
+            new VectorAggregateExpression.AggregationBuffer[aggregators.length];
+    for (int i=0; i < aggregators.length; ++i) {
+      aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer();
+      aggregators[i].reset(aggregationBuffers[i]);
+    }
+    return new VectorAggregationBufferRow(aggregationBuffers);
+  }
+
+  private void finishAggregation(DataInputBuffer key, IFile.Writer writer, boolean needFlush)
+          throws HiveException, IOException {
+    for (int i = 0; i < aggregators.length; ++i) {
+      try {
+        aggregators[i].aggregateInput(aggregationBufferRow.getAggregationBuffer(i), inputBatch);
+      } catch (HiveException e) {
+        throw new RuntimeException(e.getCause());
+      }
+    }
+
+    // In case the input batch is full but the keys are still same we need not flush.
+    // Only evaluate the aggregates and store it in the aggregationBufferRow. The aggregate
+    // functions are incremental and will take care of correctness when next batch comes for
+    // aggregation.
+    if (!needFlush) {
+      return;
+    }
+
+    int colNum = firstValueColumnOffset;
+    for (int i = 0; i < aggregators.length; ++i) {
+      aggregators[i].assignRowColumn(outputBatch, 0, colNum++,
+              aggregationBufferRow.getAggregationBuffer(i));
+    }
+
+    valueLazyBinarySerializeWrite.reset();
+    valueVectorSerializeRow.serializeWrite(outputBatch, 0);
+    valueBytesWritable.reset(valueOutput.getData(), 0, valueOutput.getLength());
+    writer.append(key, valueBytesWritable);
+    combineOutputRecordsCounter.increment(1);
+    aggregationBufferRow.reset();
+    outputBatch.reset();
+  }
+
+  private void addValueToBatch(DataInputBuffer val, DataInputBuffer key,
+                      IFile.Writer writer, boolean needFLush) throws IOException, HiveException {
+    batchValueDeserializer.setBytes(val.getData(), val.getPosition(),
+            val.getLength() - val.getPosition());
+    batchValueDeserializer.deserialize(inputBatch, inputBatch.size);
+    inputBatch.size++;
+    if (needFLush || (inputBatch.size >= VectorizedRowBatch.DEFAULT_SIZE)) {
+      processVectorGroup(key, writer, needFLush);
+    }
+  }
+
+  private void processVectorGroup(DataInputBuffer key, IFile.Writer writer, boolean needFlush)
+          throws HiveException {
+    try {
+      finishAggregation(key, writer, needFlush);
+      inputBatch.reset();
+    } catch (Exception e) {
+      String rowString;
+      try {
+        rowString = inputBatch.toString();
+      } catch (Exception e2) {
+        rowString = "[Error getting row data with exception "
+                + StringUtils.stringifyException(e2) + " ]";
+      }
+      LOG.error("Hive Runtime Error while processing vector batch" + rowString, e);
+      throw new HiveException("Hive Runtime Error while processing vector batch", e);
+    }
+  }
+
+  protected void appendDirectlyToWriter(TezRawKeyValueIterator rawIter, IFile.Writer writer) {
+    long numRows = 0;
+    try {
+      do {
+        numRows++;
+        writer.append(rawIter.getKey(), rawIter.getValue());

Review comment:
       That is taken care internally by writer i think. As i am not sure if the keys are equal or not.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] github-actions[bot] commented on pull request #1736: HIVE-24471 : Add support for combiner in hash mode group aggregation

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #1736:
URL: https://github.com/apache/hive/pull/1736#issuecomment-792135255


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.
   Feel free to reach out on the dev@hive.apache.org list if the patch is in need of reviews.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] t3rmin4t0r commented on a change in pull request #1736: HIVE-24471 : Add support for combiner in hash mode group aggregation

Posted by GitBox <gi...@apache.org>.
t3rmin4t0r commented on a change in pull request #1736:
URL: https://github.com/apache/hive/pull/1736#discussion_r539838422



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByCombiner.java
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByCombiner;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for normal group by operator. In case of map side aggregate, the partially
+// aggregated records are sorted based on group by key. If because of some reasons, like hash
+// table memory exceeded the limit or the first few batches of records have less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as the records
+// are sorted based on group by key.
+public class GroupByCombiner extends VectorGroupByCombiner {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+          org.apache.hadoop.hive.ql.exec.GroupByCombiner.class.getName());
+
+  private transient GenericUDAFEvaluator[] aggregationEvaluators;
+  Deserializer valueDeserializer;
+  GenericUDAFEvaluator.AggregationBuffer[] aggregationBuffers;
+  GroupByOperator groupByOperator;
+  Serializer valueSerializer;
+  ObjectInspector aggrObjectInspector;
+  DataInputBuffer valueBuffer;
+  Object[] cachedValues;
+
+  public GroupByCombiner(TaskContext taskContext) throws HiveException, IOException {
+    super(taskContext);
+    if (rw != null) {
+      try {
+        groupByOperator = (GroupByOperator) rw.getReducer();
+
+        ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+        ois.add(keyObjectInspector);
+        ois.add(valueObjectInspector);
+        ObjectInspector[] rowObjectInspector = new ObjectInspector[1];
+        rowObjectInspector[0] =
+            ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList,
+                        ois);
+        groupByOperator.setInputObjInspectors(rowObjectInspector);
+        groupByOperator.initializeOp(conf);
+        aggregationBuffers = groupByOperator.getAggregationBuffers();
+        aggregationEvaluators = groupByOperator.getAggregationEvaluator();
+
+        TableDesc valueTableDesc = rw.getTagToValueDesc().get(0);
+        valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+                .newInstance();
+        valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+        valueDeserializer = (AbstractSerDe) ReflectionUtils.newInstance(
+                valueTableDesc.getDeserializerClass(), null);
+        SerDeUtils.initializeSerDe(valueDeserializer, null,
+                valueTableDesc.getProperties(), null);
+
+        aggrObjectInspector = groupByOperator.getAggrObjInspector();
+        valueBuffer = new DataInputBuffer();
+        cachedValues = new Object[aggregationEvaluators.length];
+      } catch (Exception e) {
+        LOG.error(" GroupByCombiner failed", e);
+        throw new RuntimeException(e.getMessage());
+      }
+    }
+  }
+
+  private void processAggregation(IFile.Writer writer, DataInputBuffer key)
+          throws Exception {
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
+      cachedValues[i] = aggregationEvaluators[i].evaluate(aggregationBuffers[i]);
+    }
+    BytesWritable result = (BytesWritable) valueSerializer.serialize(cachedValues,
+            aggrObjectInspector);
+    valueBuffer.reset(result.getBytes(), result.getLength());
+    writer.append(key, valueBuffer);
+    combineOutputRecordsCounter.increment(1);
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
+      aggregationEvaluators[i].reset(aggregationBuffers[i]);
+    }
+  }
+
+  private void updateAggregation(BytesWritable valWritable, DataInputBuffer value)
+          throws HiveException, SerDeException {
+    valWritable.set(value.getData(), value.getPosition(),
+            value.getLength() - value.getPosition());
+    Object row = valueDeserializer.deserialize(valWritable);
+    groupByOperator.updateAggregation(row);
+  }
+
+  private void processRows(TezRawKeyValueIterator rawIter, IFile.Writer writer) {
+    long numRows = 0;
+    try {
+      DataInputBuffer key = rawIter.getKey();
+      DataInputBuffer prevKey = new DataInputBuffer();
+      prevKey.reset(key.getData(), key.getPosition(), key.getLength() - key.getPosition());
+      BytesWritable valWritable = new BytesWritable();
+      do {
+        key = rawIter.getKey();
+        // For first iteration, prevKey is always same as key.
+        if (VectorGroupByCombiner.compare(key, prevKey) != 0) {
+          processAggregation(writer, prevKey);
+          prevKey.reset(key.getData(), key.getPosition(), key.getLength() - key.getPosition());
+        }
+        updateAggregation(valWritable, rawIter.getValue());
+        numRows++;
+      } while (rawIter.next());

Review comment:
       We only need to go into deserialization if there's >1 values for 1 key, right?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByCombiner.java
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.combine.MRCombiner;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_REDUCER_CLASS;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+import static org.apache.hadoop.hive.serde2.lazy.fast.LazySimpleDeserializeRead.byteArrayCompareRanges;
+
+// Combiner for vectorized group by operator. In case of map side aggregate, the partially
+// aggregated records are sorted based on group by key. If because of some reasons, like hash
+// table memory exceeded the limit or the first few batches of records have less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as the records
+// are sorted based on group by key.
+public class VectorGroupByCombiner extends MRCombiner {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      VectorGroupByCombiner.class.getName());
+  protected final Configuration conf;
+  protected final TezCounter combineInputRecordsCounter;
+  protected final TezCounter combineOutputRecordsCounter;
+  VectorAggregateExpression[] aggregators;
+  VectorAggregationBufferRow aggregationBufferRow;
+  protected transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite;
+
+  // This helper object serializes LazyBinary format reducer values from columns of a row
+  // in a vectorized row batch.
+  protected transient VectorSerializeRow<LazyBinarySerializeWrite> valueVectorSerializeRow;
+
+  // The output buffer used to serialize a value into.
+  protected transient ByteStream.Output valueOutput;
+  DataInputBuffer valueBytesWritable;
+
+  // Only required minimal configs are copied to the worker nodes. This hack (file.) is
+  // done to include these configs to be copied to the worker node.
+  protected static String confPrefixForWorker = "file.";
+
+  VectorDeserializeRow<LazyBinaryDeserializeRead> batchValueDeserializer;
+  int firstValueColumnOffset;
+  VectorizedRowBatchCtx batchContext = null;
+  int numValueCol = 0;
+  protected ReduceWork rw;
+  VectorizedRowBatch outputBatch = null;
+  VectorizedRowBatch inputBatch = null;
+  protected Deserializer inputKeyDeserializer = null;
+  protected ObjectInspector keyObjectInspector = null;
+  protected ObjectInspector valueObjectInspector = null;
+  protected StructObjectInspector valueStructInspectors = null;
+  protected StructObjectInspector keyStructInspector = null;
+
+  public VectorGroupByCombiner(TaskContext taskContext) throws HiveException, IOException {
+    super(taskContext);
+
+    combineInputRecordsCounter =
+            taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+    combineOutputRecordsCounter =
+            taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
+
+    conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload());
+    rw = getReduceWork();
+    if (rw == null) {
+      return;
+    }
+
+    if (rw.getReducer() instanceof VectorGroupByOperator) {
+      VectorGroupByOperator vectorGroupByOperator = (VectorGroupByOperator) rw.getReducer();
+      vectorGroupByOperator.initializeOp(this.conf);
+      this.aggregators = vectorGroupByOperator.getAggregators();
+      this.aggregationBufferRow = allocateAggregationBuffer();
+      batchContext = rw.getVectorizedRowBatchCtx();
+    }
+
+    try {
+      initObjectInspectors(rw.getTagToValueDesc().get(0), rw.getKeyDesc());
+      if (batchContext != null && numValueCol > 0) {
+        initVectorBatches();
+      }
+    } catch (SerDeException e) {
+      LOG.error("Fail to initialize VectorGroupByCombiner.", e);
+      throw new RuntimeException(e.getCause());
+    }
+  }
+
+  // Get the reduce work from the config. Here some hack is used to prefix the config name with
+  // "file." to avoid the config being filtered out.
+  private ReduceWork getReduceWork() {
+    String plan =  conf.get(confPrefixForWorker + HiveConf.ConfVars.PLAN.varname);
+    this.conf.set(HiveConf.ConfVars.PLAN.varname, plan);
+    if (conf.getBoolean(confPrefixForWorker + HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname,
+            true)) {
+      Path planPath = new Path(plan);
+      planPath = new Path(planPath, REDUCE_PLAN_NAME);
+      String planString = conf.get(confPrefixForWorker + planPath.toUri().getPath());
+      this.conf.set(planPath.toUri().getPath(), planString);
+      this.conf.set(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname, "true");
+    } else {
+      this.conf.set(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname, "false");
+    }
+    this.conf.set(HAS_REDUCE_WORK, "true");
+    this.conf.set(MAPRED_REDUCER_CLASS, ExecReducer.class.getName());
+
+    return Utilities.getReduceWork(conf);
+  }
+
+  private void initObjectInspectors(TableDesc valueTableDesc,TableDesc keyTableDesc)
+          throws SerDeException {
+    inputKeyDeserializer =
+            ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), null);
+    SerDeUtils.initializeSerDe(inputKeyDeserializer, null,
+            keyTableDesc.getProperties(), null);
+    keyObjectInspector = inputKeyDeserializer.getObjectInspector();
+
+    keyStructInspector = (StructObjectInspector) keyObjectInspector;
+    firstValueColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
+
+    Deserializer inputValueDeserializer = (AbstractSerDe) ReflectionUtils.newInstance(
+            valueTableDesc.getDeserializerClass(), null);
+    SerDeUtils.initializeSerDe(inputValueDeserializer, null,
+            valueTableDesc.getProperties(), null);
+    valueObjectInspector = inputValueDeserializer.getObjectInspector();
+    valueStructInspectors = (StructObjectInspector) valueObjectInspector;
+    numValueCol = valueStructInspectors.getAllStructFieldRefs().size();
+  }
+
+  void initVectorBatches() throws HiveException {
+    inputBatch = batchContext.createVectorizedRowBatch();
+
+    // Create data buffers for value bytes column vectors.
+    for (int i = firstValueColumnOffset; i < inputBatch.numCols; i++) {
+      ColumnVector colVector = inputBatch.cols[i];
+      if (colVector instanceof BytesColumnVector) {
+        BytesColumnVector bytesColumnVector = (BytesColumnVector) colVector;
+        bytesColumnVector.initBuffer();
+      }
+    }
+
+    batchValueDeserializer =
+            new VectorDeserializeRow<>(
+                    new LazyBinaryDeserializeRead(
+                            VectorizedBatchUtil.typeInfosFromStructObjectInspector(
+                                    valueStructInspectors),
+                            true));
+    batchValueDeserializer.init(firstValueColumnOffset);
+
+    int[] valueColumnMap = new int[numValueCol];
+    for (int i = 0; i < numValueCol; i++) {
+      valueColumnMap[i] = i + firstValueColumnOffset;
+    }
+
+    valueLazyBinarySerializeWrite = new LazyBinarySerializeWrite(numValueCol);
+    valueVectorSerializeRow = new VectorSerializeRow<>(valueLazyBinarySerializeWrite);
+    valueVectorSerializeRow.init(VectorizedBatchUtil.typeInfosFromStructObjectInspector(
+            valueStructInspectors), valueColumnMap);
+    valueOutput = new ByteStream.Output();
+    valueVectorSerializeRow.setOutput(valueOutput);
+    outputBatch = batchContext.createVectorizedRowBatch();
+    valueBytesWritable = new DataInputBuffer();
+  }
+
+  private VectorAggregationBufferRow allocateAggregationBuffer() throws HiveException {
+    VectorAggregateExpression.AggregationBuffer[] aggregationBuffers =
+            new VectorAggregateExpression.AggregationBuffer[aggregators.length];
+    for (int i=0; i < aggregators.length; ++i) {
+      aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer();
+      aggregators[i].reset(aggregationBuffers[i]);
+    }
+    return new VectorAggregationBufferRow(aggregationBuffers);
+  }
+
+  private void finishAggregation(DataInputBuffer key, IFile.Writer writer, boolean needFlush)
+          throws HiveException, IOException {
+    for (int i = 0; i < aggregators.length; ++i) {
+      try {
+        aggregators[i].aggregateInput(aggregationBufferRow.getAggregationBuffer(i), inputBatch);
+      } catch (HiveException e) {
+        throw new RuntimeException(e.getCause());
+      }
+    }
+
+    // In case the input batch is full but the keys are still same we need not flush.
+    // Only evaluate the aggregates and store it in the aggregationBufferRow. The aggregate
+    // functions are incremental and will take care of correctness when next batch comes for
+    // aggregation.
+    if (!needFlush) {
+      return;
+    }
+
+    int colNum = firstValueColumnOffset;
+    for (int i = 0; i < aggregators.length; ++i) {
+      aggregators[i].assignRowColumn(outputBatch, 0, colNum++,
+              aggregationBufferRow.getAggregationBuffer(i));
+    }
+
+    valueLazyBinarySerializeWrite.reset();
+    valueVectorSerializeRow.serializeWrite(outputBatch, 0);
+    valueBytesWritable.reset(valueOutput.getData(), 0, valueOutput.getLength());
+    writer.append(key, valueBytesWritable);
+    combineOutputRecordsCounter.increment(1);
+    aggregationBufferRow.reset();
+    outputBatch.reset();
+  }
+
+  private void addValueToBatch(DataInputBuffer val, DataInputBuffer key,
+                      IFile.Writer writer, boolean needFLush) throws IOException, HiveException {
+    batchValueDeserializer.setBytes(val.getData(), val.getPosition(),
+            val.getLength() - val.getPosition());
+    batchValueDeserializer.deserialize(inputBatch, inputBatch.size);
+    inputBatch.size++;
+    if (needFLush || (inputBatch.size >= VectorizedRowBatch.DEFAULT_SIZE)) {
+      processVectorGroup(key, writer, needFLush);
+    }
+  }
+
+  private void processVectorGroup(DataInputBuffer key, IFile.Writer writer, boolean needFlush)
+          throws HiveException {
+    try {
+      finishAggregation(key, writer, needFlush);
+      inputBatch.reset();
+    } catch (Exception e) {
+      String rowString;
+      try {
+        rowString = inputBatch.toString();
+      } catch (Exception e2) {
+        rowString = "[Error getting row data with exception "
+                + StringUtils.stringifyException(e2) + " ]";
+      }
+      LOG.error("Hive Runtime Error while processing vector batch" + rowString, e);
+      throw new HiveException("Hive Runtime Error while processing vector batch", e);
+    }
+  }
+
+  protected void appendDirectlyToWriter(TezRawKeyValueIterator rawIter, IFile.Writer writer) {
+    long numRows = 0;
+    try {
+      do {
+        numRows++;
+        writer.append(rawIter.getKey(), rawIter.getValue());
+      } while (rawIter.next());
+      combineInputRecordsCounter.increment(numRows);
+      combineOutputRecordsCounter.increment(numRows);
+    } catch(IOException e) {
+      LOG.error("Append to writer failed", e);
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  private void appendToWriter(DataInputBuffer val, DataInputBuffer key, IFile.Writer writer) {
+    try {
+      writer.append(key, val);
+      combineOutputRecordsCounter.increment(1);
+    } catch(IOException e) {
+      LOG.error("Append value list to writer failed", e);
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  public static int compare(DataInputBuffer buf1, DataInputBuffer buf2) {

Review comment:
       Is it compare or is it equals? (Equals has a fast-path exit for different length)

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByCombiner.java
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByCombiner;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for normal group by operator. In case of map side aggregate, the partially
+// aggregated records are sorted based on group by key. If because of some reasons, like hash
+// table memory exceeded the limit or the first few batches of records have less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as the records
+// are sorted based on group by key.
+public class GroupByCombiner extends VectorGroupByCombiner {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+          org.apache.hadoop.hive.ql.exec.GroupByCombiner.class.getName());
+
+  private transient GenericUDAFEvaluator[] aggregationEvaluators;
+  Deserializer valueDeserializer;
+  GenericUDAFEvaluator.AggregationBuffer[] aggregationBuffers;
+  GroupByOperator groupByOperator;
+  Serializer valueSerializer;
+  ObjectInspector aggrObjectInspector;
+  DataInputBuffer valueBuffer;
+  Object[] cachedValues;
+
+  public GroupByCombiner(TaskContext taskContext) throws HiveException, IOException {
+    super(taskContext);
+    if (rw != null) {
+      try {
+        groupByOperator = (GroupByOperator) rw.getReducer();
+
+        ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+        ois.add(keyObjectInspector);
+        ois.add(valueObjectInspector);
+        ObjectInspector[] rowObjectInspector = new ObjectInspector[1];
+        rowObjectInspector[0] =
+            ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList,
+                        ois);
+        groupByOperator.setInputObjInspectors(rowObjectInspector);
+        groupByOperator.initializeOp(conf);
+        aggregationBuffers = groupByOperator.getAggregationBuffers();
+        aggregationEvaluators = groupByOperator.getAggregationEvaluator();
+
+        TableDesc valueTableDesc = rw.getTagToValueDesc().get(0);
+        valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+                .newInstance();
+        valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+        valueDeserializer = (AbstractSerDe) ReflectionUtils.newInstance(
+                valueTableDesc.getDeserializerClass(), null);
+        SerDeUtils.initializeSerDe(valueDeserializer, null,
+                valueTableDesc.getProperties(), null);
+
+        aggrObjectInspector = groupByOperator.getAggrObjInspector();
+        valueBuffer = new DataInputBuffer();
+        cachedValues = new Object[aggregationEvaluators.length];
+      } catch (Exception e) {
+        LOG.error(" GroupByCombiner failed", e);
+        throw new RuntimeException(e.getMessage());
+      }
+    }
+  }
+
+  private void processAggregation(IFile.Writer writer, DataInputBuffer key)
+          throws Exception {
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
+      cachedValues[i] = aggregationEvaluators[i].evaluate(aggregationBuffers[i]);
+    }
+    BytesWritable result = (BytesWritable) valueSerializer.serialize(cachedValues,
+            aggrObjectInspector);
+    valueBuffer.reset(result.getBytes(), result.getLength());
+    writer.append(key, valueBuffer);
+    combineOutputRecordsCounter.increment(1);
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
+      aggregationEvaluators[i].reset(aggregationBuffers[i]);
+    }
+  }
+
+  private void updateAggregation(BytesWritable valWritable, DataInputBuffer value)
+          throws HiveException, SerDeException {
+    valWritable.set(value.getData(), value.getPosition(),
+            value.getLength() - value.getPosition());
+    Object row = valueDeserializer.deserialize(valWritable);
+    groupByOperator.updateAggregation(row);
+  }
+
+  private void processRows(TezRawKeyValueIterator rawIter, IFile.Writer writer) {
+    long numRows = 0;
+    try {
+      DataInputBuffer key = rawIter.getKey();
+      DataInputBuffer prevKey = new DataInputBuffer();
+      prevKey.reset(key.getData(), key.getPosition(), key.getLength() - key.getPosition());
+      BytesWritable valWritable = new BytesWritable();
+      do {
+        key = rawIter.getKey();
+        // For first iteration, prevKey is always same as key.
+        if (VectorGroupByCombiner.compare(key, prevKey) != 0) {

Review comment:
       Is this comparison necessary? Doesn't the combiner give you one iterator per key?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
##########
@@ -712,6 +751,12 @@ private void processKey(Object row,
 
   @Override
   public void process(Object row, int tag) throws HiveException {
+    if (hashAggr) {
+      if (getConfiguration().get("forced.streaming.mode", "false").equals("true")) {

Review comment:
       This should go into a GroupbyDesc

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByCombiner.java
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.combine.MRCombiner;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_REDUCER_CLASS;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+import static org.apache.hadoop.hive.serde2.lazy.fast.LazySimpleDeserializeRead.byteArrayCompareRanges;
+
+// Combiner for vectorized group by operator. In case of map side aggregate, the partially
+// aggregated records are sorted based on group by key. If because of some reasons, like hash
+// table memory exceeded the limit or the first few batches of records have less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as the records
+// are sorted based on group by key.
+public class VectorGroupByCombiner extends MRCombiner {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      VectorGroupByCombiner.class.getName());
+  protected final Configuration conf;
+  protected final TezCounter combineInputRecordsCounter;
+  protected final TezCounter combineOutputRecordsCounter;
+  VectorAggregateExpression[] aggregators;
+  VectorAggregationBufferRow aggregationBufferRow;
+  protected transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite;
+
+  // This helper object serializes LazyBinary format reducer values from columns of a row
+  // in a vectorized row batch.
+  protected transient VectorSerializeRow<LazyBinarySerializeWrite> valueVectorSerializeRow;
+
+  // The output buffer used to serialize a value into.
+  protected transient ByteStream.Output valueOutput;
+  DataInputBuffer valueBytesWritable;
+
+  // Only required minimal configs are copied to the worker nodes. This hack (file.) is
+  // done to include these configs to be copied to the worker node.
+  protected static String confPrefixForWorker = "file.";
+
+  VectorDeserializeRow<LazyBinaryDeserializeRead> batchValueDeserializer;
+  int firstValueColumnOffset;
+  VectorizedRowBatchCtx batchContext = null;
+  int numValueCol = 0;
+  protected ReduceWork rw;
+  VectorizedRowBatch outputBatch = null;
+  VectorizedRowBatch inputBatch = null;
+  protected Deserializer inputKeyDeserializer = null;
+  protected ObjectInspector keyObjectInspector = null;
+  protected ObjectInspector valueObjectInspector = null;
+  protected StructObjectInspector valueStructInspectors = null;
+  protected StructObjectInspector keyStructInspector = null;
+
+  public VectorGroupByCombiner(TaskContext taskContext) throws HiveException, IOException {
+    super(taskContext);
+
+    combineInputRecordsCounter =
+            taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+    combineOutputRecordsCounter =
+            taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
+
+    conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload());
+    rw = getReduceWork();
+    if (rw == null) {
+      return;
+    }
+
+    if (rw.getReducer() instanceof VectorGroupByOperator) {
+      VectorGroupByOperator vectorGroupByOperator = (VectorGroupByOperator) rw.getReducer();
+      vectorGroupByOperator.initializeOp(this.conf);
+      this.aggregators = vectorGroupByOperator.getAggregators();
+      this.aggregationBufferRow = allocateAggregationBuffer();
+      batchContext = rw.getVectorizedRowBatchCtx();
+    }
+
+    try {
+      initObjectInspectors(rw.getTagToValueDesc().get(0), rw.getKeyDesc());
+      if (batchContext != null && numValueCol > 0) {
+        initVectorBatches();
+      }
+    } catch (SerDeException e) {
+      LOG.error("Fail to initialize VectorGroupByCombiner.", e);
+      throw new RuntimeException(e.getCause());
+    }
+  }
+
+  // Get the reduce work from the config. Here some hack is used to prefix the config name with
+  // "file." to avoid the config being filtered out.
+  private ReduceWork getReduceWork() {
+    String plan =  conf.get(confPrefixForWorker + HiveConf.ConfVars.PLAN.varname);
+    this.conf.set(HiveConf.ConfVars.PLAN.varname, plan);
+    if (conf.getBoolean(confPrefixForWorker + HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname,
+            true)) {
+      Path planPath = new Path(plan);
+      planPath = new Path(planPath, REDUCE_PLAN_NAME);
+      String planString = conf.get(confPrefixForWorker + planPath.toUri().getPath());
+      this.conf.set(planPath.toUri().getPath(), planString);
+      this.conf.set(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname, "true");
+    } else {
+      this.conf.set(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname, "false");
+    }
+    this.conf.set(HAS_REDUCE_WORK, "true");
+    this.conf.set(MAPRED_REDUCER_CLASS, ExecReducer.class.getName());
+
+    return Utilities.getReduceWork(conf);
+  }
+
+  private void initObjectInspectors(TableDesc valueTableDesc,TableDesc keyTableDesc)
+          throws SerDeException {
+    inputKeyDeserializer =
+            ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), null);
+    SerDeUtils.initializeSerDe(inputKeyDeserializer, null,
+            keyTableDesc.getProperties(), null);
+    keyObjectInspector = inputKeyDeserializer.getObjectInspector();
+
+    keyStructInspector = (StructObjectInspector) keyObjectInspector;
+    firstValueColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
+
+    Deserializer inputValueDeserializer = (AbstractSerDe) ReflectionUtils.newInstance(
+            valueTableDesc.getDeserializerClass(), null);
+    SerDeUtils.initializeSerDe(inputValueDeserializer, null,
+            valueTableDesc.getProperties(), null);
+    valueObjectInspector = inputValueDeserializer.getObjectInspector();
+    valueStructInspectors = (StructObjectInspector) valueObjectInspector;
+    numValueCol = valueStructInspectors.getAllStructFieldRefs().size();
+  }
+
+  void initVectorBatches() throws HiveException {
+    inputBatch = batchContext.createVectorizedRowBatch();
+
+    // Create data buffers for value bytes column vectors.
+    for (int i = firstValueColumnOffset; i < inputBatch.numCols; i++) {
+      ColumnVector colVector = inputBatch.cols[i];
+      if (colVector instanceof BytesColumnVector) {
+        BytesColumnVector bytesColumnVector = (BytesColumnVector) colVector;
+        bytesColumnVector.initBuffer();
+      }
+    }
+
+    batchValueDeserializer =
+            new VectorDeserializeRow<>(
+                    new LazyBinaryDeserializeRead(
+                            VectorizedBatchUtil.typeInfosFromStructObjectInspector(
+                                    valueStructInspectors),
+                            true));
+    batchValueDeserializer.init(firstValueColumnOffset);
+
+    int[] valueColumnMap = new int[numValueCol];
+    for (int i = 0; i < numValueCol; i++) {
+      valueColumnMap[i] = i + firstValueColumnOffset;
+    }
+
+    valueLazyBinarySerializeWrite = new LazyBinarySerializeWrite(numValueCol);
+    valueVectorSerializeRow = new VectorSerializeRow<>(valueLazyBinarySerializeWrite);
+    valueVectorSerializeRow.init(VectorizedBatchUtil.typeInfosFromStructObjectInspector(
+            valueStructInspectors), valueColumnMap);
+    valueOutput = new ByteStream.Output();
+    valueVectorSerializeRow.setOutput(valueOutput);
+    outputBatch = batchContext.createVectorizedRowBatch();
+    valueBytesWritable = new DataInputBuffer();
+  }
+
+  private VectorAggregationBufferRow allocateAggregationBuffer() throws HiveException {
+    VectorAggregateExpression.AggregationBuffer[] aggregationBuffers =
+            new VectorAggregateExpression.AggregationBuffer[aggregators.length];
+    for (int i=0; i < aggregators.length; ++i) {
+      aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer();
+      aggregators[i].reset(aggregationBuffers[i]);
+    }
+    return new VectorAggregationBufferRow(aggregationBuffers);
+  }
+
+  private void finishAggregation(DataInputBuffer key, IFile.Writer writer, boolean needFlush)
+          throws HiveException, IOException {
+    for (int i = 0; i < aggregators.length; ++i) {
+      try {
+        aggregators[i].aggregateInput(aggregationBufferRow.getAggregationBuffer(i), inputBatch);
+      } catch (HiveException e) {
+        throw new RuntimeException(e.getCause());
+      }
+    }
+
+    // In case the input batch is full but the keys are still same we need not flush.
+    // Only evaluate the aggregates and store it in the aggregationBufferRow. The aggregate
+    // functions are incremental and will take care of correctness when next batch comes for
+    // aggregation.
+    if (!needFlush) {
+      return;
+    }
+
+    int colNum = firstValueColumnOffset;
+    for (int i = 0; i < aggregators.length; ++i) {
+      aggregators[i].assignRowColumn(outputBatch, 0, colNum++,
+              aggregationBufferRow.getAggregationBuffer(i));
+    }
+
+    valueLazyBinarySerializeWrite.reset();
+    valueVectorSerializeRow.serializeWrite(outputBatch, 0);
+    valueBytesWritable.reset(valueOutput.getData(), 0, valueOutput.getLength());
+    writer.append(key, valueBytesWritable);
+    combineOutputRecordsCounter.increment(1);
+    aggregationBufferRow.reset();
+    outputBatch.reset();
+  }
+
+  private void addValueToBatch(DataInputBuffer val, DataInputBuffer key,
+                      IFile.Writer writer, boolean needFLush) throws IOException, HiveException {
+    batchValueDeserializer.setBytes(val.getData(), val.getPosition(),
+            val.getLength() - val.getPosition());
+    batchValueDeserializer.deserialize(inputBatch, inputBatch.size);
+    inputBatch.size++;
+    if (needFLush || (inputBatch.size >= VectorizedRowBatch.DEFAULT_SIZE)) {
+      processVectorGroup(key, writer, needFLush);
+    }
+  }
+
+  private void processVectorGroup(DataInputBuffer key, IFile.Writer writer, boolean needFlush)
+          throws HiveException {
+    try {
+      finishAggregation(key, writer, needFlush);
+      inputBatch.reset();
+    } catch (Exception e) {
+      String rowString;
+      try {
+        rowString = inputBatch.toString();
+      } catch (Exception e2) {
+        rowString = "[Error getting row data with exception "
+                + StringUtils.stringifyException(e2) + " ]";
+      }
+      LOG.error("Hive Runtime Error while processing vector batch" + rowString, e);
+      throw new HiveException("Hive Runtime Error while processing vector batch", e);
+    }
+  }
+
+  protected void appendDirectlyToWriter(TezRawKeyValueIterator rawIter, IFile.Writer writer) {
+    long numRows = 0;
+    try {
+      do {
+        numRows++;
+        writer.append(rawIter.getKey(), rawIter.getValue());

Review comment:
       Need a Tez internal SAME_KEY here, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] maheshk114 commented on a change in pull request #1736: HIVE-24471 : Add support for combiner in hash mode group aggregation

Posted by GitBox <gi...@apache.org>.
maheshk114 commented on a change in pull request #1736:
URL: https://github.com/apache/hive/pull/1736#discussion_r551109482



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByCombiner.java
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByCombiner;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for normal group by operator. In case of map side aggregate, the partially
+// aggregated records are sorted based on group by key. If because of some reasons, like hash
+// table memory exceeded the limit or the first few batches of records have less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as the records
+// are sorted based on group by key.
+public class GroupByCombiner extends VectorGroupByCombiner {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+          org.apache.hadoop.hive.ql.exec.GroupByCombiner.class.getName());
+
+  private transient GenericUDAFEvaluator[] aggregationEvaluators;
+  Deserializer valueDeserializer;
+  GenericUDAFEvaluator.AggregationBuffer[] aggregationBuffers;
+  GroupByOperator groupByOperator;
+  Serializer valueSerializer;
+  ObjectInspector aggrObjectInspector;
+  DataInputBuffer valueBuffer;
+  Object[] cachedValues;
+
+  public GroupByCombiner(TaskContext taskContext) throws HiveException, IOException {
+    super(taskContext);
+    if (rw != null) {
+      try {
+        groupByOperator = (GroupByOperator) rw.getReducer();
+
+        ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+        ois.add(keyObjectInspector);
+        ois.add(valueObjectInspector);
+        ObjectInspector[] rowObjectInspector = new ObjectInspector[1];
+        rowObjectInspector[0] =
+            ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList,
+                        ois);
+        groupByOperator.setInputObjInspectors(rowObjectInspector);
+        groupByOperator.initializeOp(conf);
+        aggregationBuffers = groupByOperator.getAggregationBuffers();
+        aggregationEvaluators = groupByOperator.getAggregationEvaluator();
+
+        TableDesc valueTableDesc = rw.getTagToValueDesc().get(0);
+        if ((aggregationEvaluators == null) || (aggregationEvaluators.length != numValueCol)) {
+          //TODO : Need to support distinct. The logic has to be changed to extract only
+          // those aggregates which are not part of distinct.
+          LOG.info(" Combiner is disabled as the number of value columns does" +
+                  " not match with number of aggregators");
+          numValueCol = 0;
+          rw = null;
+          return;
+        }
+        valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+                .newInstance();
+        valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+        valueDeserializer = (AbstractSerDe) ReflectionUtils.newInstance(
+                valueTableDesc.getDeserializerClass(), null);
+        SerDeUtils.initializeSerDe(valueDeserializer, null,
+                valueTableDesc.getProperties(), null);
+
+        aggrObjectInspector = groupByOperator.getAggrObjInspector();
+        valueBuffer = new DataInputBuffer();
+        cachedValues = new Object[aggregationEvaluators.length];
+      } catch (Exception e) {
+        LOG.error(" GroupByCombiner failed", e);
+        throw new RuntimeException(e.getMessage());
+      }
+    }
+  }
+
+  private void processAggregation(IFile.Writer writer, DataInputBuffer key)
+          throws Exception {
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
+      cachedValues[i] = aggregationEvaluators[i].evaluate(aggregationBuffers[i]);
+    }
+    BytesWritable result = (BytesWritable) valueSerializer.serialize(cachedValues,
+            aggrObjectInspector);
+    valueBuffer.reset(result.getBytes(), result.getLength());
+    writer.append(key, valueBuffer);
+    combineOutputRecordsCounter.increment(1);
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
+      aggregationEvaluators[i].reset(aggregationBuffers[i]);
+    }
+  }
+
+  private void updateAggregation(BytesWritable valWritable, DataInputBuffer value)
+          throws HiveException, SerDeException {
+    valWritable.set(value.getData(), value.getPosition(),
+            value.getLength() - value.getPosition());
+    Object row = valueDeserializer.deserialize(valWritable);
+    groupByOperator.updateAggregation(row);
+  }
+
+  private void processRows(TezRawKeyValueIterator rawIter, IFile.Writer writer) {
+    long numRows = 0;
+    try {
+      DataInputBuffer key = rawIter.getKey();
+      DataInputBuffer prevKey = new DataInputBuffer();

Review comment:
       done

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -515,6 +516,10 @@ DAG build(JobConf conf, TezWork tezWork, Path scratchDir, Context ctx,
           Edge e = null;
 
           TezEdgeProperty edgeProp = tezWork.getEdgeProperty(workUnit, v);
+
+          //Add the reducer plan to config to create the combiner object in case of group by.
+          wxConf = GroupByCombiner.setCombinerInConf(v, wxConf, workToConf.get(v));

Review comment:
       During create vertex we may not know the child or parent. The node may get reordered after vertex is created. So to be on safer side it is done here once all the optimizations are done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] maheshk114 commented on a change in pull request #1736: HIVE-24471 : Add support for combiner in hash mode group aggregation

Posted by GitBox <gi...@apache.org>.
maheshk114 commented on a change in pull request #1736:
URL: https://github.com/apache/hive/pull/1736#discussion_r539842254



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByCombiner.java
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.combine.MRCombiner;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_REDUCER_CLASS;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+import static org.apache.hadoop.hive.serde2.lazy.fast.LazySimpleDeserializeRead.byteArrayCompareRanges;
+
+// Combiner for vectorized group by operator. In case of map side aggregate, the partially
+// aggregated records are sorted based on group by key. If because of some reasons, like hash
+// table memory exceeded the limit or the first few batches of records have less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as the records
+// are sorted based on group by key.
+public class VectorGroupByCombiner extends MRCombiner {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      VectorGroupByCombiner.class.getName());
+  protected final Configuration conf;
+  protected final TezCounter combineInputRecordsCounter;
+  protected final TezCounter combineOutputRecordsCounter;
+  VectorAggregateExpression[] aggregators;
+  VectorAggregationBufferRow aggregationBufferRow;
+  protected transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite;
+
+  // This helper object serializes LazyBinary format reducer values from columns of a row
+  // in a vectorized row batch.
+  protected transient VectorSerializeRow<LazyBinarySerializeWrite> valueVectorSerializeRow;
+
+  // The output buffer used to serialize a value into.
+  protected transient ByteStream.Output valueOutput;
+  DataInputBuffer valueBytesWritable;
+
+  // Only required minimal configs are copied to the worker nodes. This hack (file.) is
+  // done to include these configs to be copied to the worker node.
+  protected static String confPrefixForWorker = "file.";
+
+  VectorDeserializeRow<LazyBinaryDeserializeRead> batchValueDeserializer;
+  int firstValueColumnOffset;
+  VectorizedRowBatchCtx batchContext = null;
+  int numValueCol = 0;
+  protected ReduceWork rw;
+  VectorizedRowBatch outputBatch = null;
+  VectorizedRowBatch inputBatch = null;
+  protected Deserializer inputKeyDeserializer = null;
+  protected ObjectInspector keyObjectInspector = null;
+  protected ObjectInspector valueObjectInspector = null;
+  protected StructObjectInspector valueStructInspectors = null;
+  protected StructObjectInspector keyStructInspector = null;
+
+  public VectorGroupByCombiner(TaskContext taskContext) throws HiveException, IOException {
+    super(taskContext);
+
+    combineInputRecordsCounter =
+            taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+    combineOutputRecordsCounter =
+            taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
+
+    conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload());
+    rw = getReduceWork();
+    if (rw == null) {
+      return;
+    }
+
+    if (rw.getReducer() instanceof VectorGroupByOperator) {
+      VectorGroupByOperator vectorGroupByOperator = (VectorGroupByOperator) rw.getReducer();
+      vectorGroupByOperator.initializeOp(this.conf);
+      this.aggregators = vectorGroupByOperator.getAggregators();
+      this.aggregationBufferRow = allocateAggregationBuffer();
+      batchContext = rw.getVectorizedRowBatchCtx();
+    }
+
+    try {
+      initObjectInspectors(rw.getTagToValueDesc().get(0), rw.getKeyDesc());
+      if (batchContext != null && numValueCol > 0) {
+        initVectorBatches();
+      }
+    } catch (SerDeException e) {
+      LOG.error("Fail to initialize VectorGroupByCombiner.", e);
+      throw new RuntimeException(e.getCause());
+    }
+  }
+
+  // Get the reduce work from the config. Here some hack is used to prefix the config name with
+  // "file." to avoid the config being filtered out.
+  private ReduceWork getReduceWork() {
+    String plan =  conf.get(confPrefixForWorker + HiveConf.ConfVars.PLAN.varname);
+    this.conf.set(HiveConf.ConfVars.PLAN.varname, plan);
+    if (conf.getBoolean(confPrefixForWorker + HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname,
+            true)) {
+      Path planPath = new Path(plan);
+      planPath = new Path(planPath, REDUCE_PLAN_NAME);
+      String planString = conf.get(confPrefixForWorker + planPath.toUri().getPath());
+      this.conf.set(planPath.toUri().getPath(), planString);
+      this.conf.set(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname, "true");
+    } else {
+      this.conf.set(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname, "false");
+    }
+    this.conf.set(HAS_REDUCE_WORK, "true");
+    this.conf.set(MAPRED_REDUCER_CLASS, ExecReducer.class.getName());
+
+    return Utilities.getReduceWork(conf);
+  }
+
+  private void initObjectInspectors(TableDesc valueTableDesc,TableDesc keyTableDesc)
+          throws SerDeException {
+    inputKeyDeserializer =
+            ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), null);
+    SerDeUtils.initializeSerDe(inputKeyDeserializer, null,
+            keyTableDesc.getProperties(), null);
+    keyObjectInspector = inputKeyDeserializer.getObjectInspector();
+
+    keyStructInspector = (StructObjectInspector) keyObjectInspector;
+    firstValueColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
+
+    Deserializer inputValueDeserializer = (AbstractSerDe) ReflectionUtils.newInstance(
+            valueTableDesc.getDeserializerClass(), null);
+    SerDeUtils.initializeSerDe(inputValueDeserializer, null,
+            valueTableDesc.getProperties(), null);
+    valueObjectInspector = inputValueDeserializer.getObjectInspector();
+    valueStructInspectors = (StructObjectInspector) valueObjectInspector;
+    numValueCol = valueStructInspectors.getAllStructFieldRefs().size();
+  }
+
+  void initVectorBatches() throws HiveException {
+    inputBatch = batchContext.createVectorizedRowBatch();
+
+    // Create data buffers for value bytes column vectors.
+    for (int i = firstValueColumnOffset; i < inputBatch.numCols; i++) {
+      ColumnVector colVector = inputBatch.cols[i];
+      if (colVector instanceof BytesColumnVector) {
+        BytesColumnVector bytesColumnVector = (BytesColumnVector) colVector;
+        bytesColumnVector.initBuffer();
+      }
+    }
+
+    batchValueDeserializer =
+            new VectorDeserializeRow<>(
+                    new LazyBinaryDeserializeRead(
+                            VectorizedBatchUtil.typeInfosFromStructObjectInspector(
+                                    valueStructInspectors),
+                            true));
+    batchValueDeserializer.init(firstValueColumnOffset);
+
+    int[] valueColumnMap = new int[numValueCol];
+    for (int i = 0; i < numValueCol; i++) {
+      valueColumnMap[i] = i + firstValueColumnOffset;
+    }
+
+    valueLazyBinarySerializeWrite = new LazyBinarySerializeWrite(numValueCol);
+    valueVectorSerializeRow = new VectorSerializeRow<>(valueLazyBinarySerializeWrite);
+    valueVectorSerializeRow.init(VectorizedBatchUtil.typeInfosFromStructObjectInspector(
+            valueStructInspectors), valueColumnMap);
+    valueOutput = new ByteStream.Output();
+    valueVectorSerializeRow.setOutput(valueOutput);
+    outputBatch = batchContext.createVectorizedRowBatch();
+    valueBytesWritable = new DataInputBuffer();
+  }
+
+  private VectorAggregationBufferRow allocateAggregationBuffer() throws HiveException {
+    VectorAggregateExpression.AggregationBuffer[] aggregationBuffers =
+            new VectorAggregateExpression.AggregationBuffer[aggregators.length];
+    for (int i=0; i < aggregators.length; ++i) {
+      aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer();
+      aggregators[i].reset(aggregationBuffers[i]);
+    }
+    return new VectorAggregationBufferRow(aggregationBuffers);
+  }
+
+  private void finishAggregation(DataInputBuffer key, IFile.Writer writer, boolean needFlush)
+          throws HiveException, IOException {
+    for (int i = 0; i < aggregators.length; ++i) {
+      try {
+        aggregators[i].aggregateInput(aggregationBufferRow.getAggregationBuffer(i), inputBatch);
+      } catch (HiveException e) {
+        throw new RuntimeException(e.getCause());
+      }
+    }
+
+    // In case the input batch is full but the keys are still same we need not flush.
+    // Only evaluate the aggregates and store it in the aggregationBufferRow. The aggregate
+    // functions are incremental and will take care of correctness when next batch comes for
+    // aggregation.
+    if (!needFlush) {
+      return;
+    }
+
+    int colNum = firstValueColumnOffset;
+    for (int i = 0; i < aggregators.length; ++i) {
+      aggregators[i].assignRowColumn(outputBatch, 0, colNum++,
+              aggregationBufferRow.getAggregationBuffer(i));
+    }
+
+    valueLazyBinarySerializeWrite.reset();
+    valueVectorSerializeRow.serializeWrite(outputBatch, 0);
+    valueBytesWritable.reset(valueOutput.getData(), 0, valueOutput.getLength());
+    writer.append(key, valueBytesWritable);
+    combineOutputRecordsCounter.increment(1);
+    aggregationBufferRow.reset();
+    outputBatch.reset();
+  }
+
+  private void addValueToBatch(DataInputBuffer val, DataInputBuffer key,
+                      IFile.Writer writer, boolean needFLush) throws IOException, HiveException {
+    batchValueDeserializer.setBytes(val.getData(), val.getPosition(),
+            val.getLength() - val.getPosition());
+    batchValueDeserializer.deserialize(inputBatch, inputBatch.size);
+    inputBatch.size++;
+    if (needFLush || (inputBatch.size >= VectorizedRowBatch.DEFAULT_SIZE)) {
+      processVectorGroup(key, writer, needFLush);
+    }
+  }
+
+  private void processVectorGroup(DataInputBuffer key, IFile.Writer writer, boolean needFlush)
+          throws HiveException {
+    try {
+      finishAggregation(key, writer, needFlush);
+      inputBatch.reset();
+    } catch (Exception e) {
+      String rowString;
+      try {
+        rowString = inputBatch.toString();
+      } catch (Exception e2) {
+        rowString = "[Error getting row data with exception "
+                + StringUtils.stringifyException(e2) + " ]";
+      }
+      LOG.error("Hive Runtime Error while processing vector batch" + rowString, e);
+      throw new HiveException("Hive Runtime Error while processing vector batch", e);
+    }
+  }
+
+  protected void appendDirectlyToWriter(TezRawKeyValueIterator rawIter, IFile.Writer writer) {
+    long numRows = 0;
+    try {
+      do {
+        numRows++;
+        writer.append(rawIter.getKey(), rawIter.getValue());
+      } while (rawIter.next());
+      combineInputRecordsCounter.increment(numRows);
+      combineOutputRecordsCounter.increment(numRows);
+    } catch(IOException e) {
+      LOG.error("Append to writer failed", e);
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  private void appendToWriter(DataInputBuffer val, DataInputBuffer key, IFile.Writer writer) {
+    try {
+      writer.append(key, val);
+      combineOutputRecordsCounter.increment(1);
+    } catch(IOException e) {
+      LOG.error("Append value list to writer failed", e);
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  public static int compare(DataInputBuffer buf1, DataInputBuffer buf2) {

Review comment:
       make sense. I will check for the equal.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] maheshk114 commented on a change in pull request #1736: HIVE-24471 : Add support for combiner in hash mode group aggregation

Posted by GitBox <gi...@apache.org>.
maheshk114 commented on a change in pull request #1736:
URL: https://github.com/apache/hive/pull/1736#discussion_r551109525



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByCombiner.java
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByCombiner;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for normal group by operator. In case of map side aggregate, the partially
+// aggregated records are sorted based on group by key. If because of some reasons, like hash
+// table memory exceeded the limit or the first few batches of records have less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as the records
+// are sorted based on group by key.
+public class GroupByCombiner extends VectorGroupByCombiner {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+          org.apache.hadoop.hive.ql.exec.GroupByCombiner.class.getName());
+
+  private transient GenericUDAFEvaluator[] aggregationEvaluators;
+  Deserializer valueDeserializer;
+  GenericUDAFEvaluator.AggregationBuffer[] aggregationBuffers;
+  GroupByOperator groupByOperator;
+  Serializer valueSerializer;
+  ObjectInspector aggrObjectInspector;
+  DataInputBuffer valueBuffer;
+  Object[] cachedValues;
+
+  public GroupByCombiner(TaskContext taskContext) throws HiveException, IOException {
+    super(taskContext);
+    if (rw != null) {
+      try {
+        groupByOperator = (GroupByOperator) rw.getReducer();
+
+        ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+        ois.add(keyObjectInspector);
+        ois.add(valueObjectInspector);
+        ObjectInspector[] rowObjectInspector = new ObjectInspector[1];
+        rowObjectInspector[0] =
+            ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList,
+                        ois);
+        groupByOperator.setInputObjInspectors(rowObjectInspector);
+        groupByOperator.initializeOp(conf);
+        aggregationBuffers = groupByOperator.getAggregationBuffers();
+        aggregationEvaluators = groupByOperator.getAggregationEvaluator();
+
+        TableDesc valueTableDesc = rw.getTagToValueDesc().get(0);
+        if ((aggregationEvaluators == null) || (aggregationEvaluators.length != numValueCol)) {
+          //TODO : Need to support distinct. The logic has to be changed to extract only

Review comment:
       HIVE-24580

##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -1790,6 +1790,10 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
     HIVEALIAS("hive.alias", "", ""),
     HIVEMAPSIDEAGGREGATE("hive.map.aggr", true, "Whether to use map-side aggregation in Hive Group By queries"),
     HIVEGROUPBYSKEW("hive.groupby.skewindata", false, "Whether there is skew in data to optimize group by queries"),
+
+    HIVE_ENABLE_COMBINER_FOR_GROUP_BY("hive.enable.combiner.for.groupby", true,
+        "Whether to enable tez combiner to aggregate the records after sorting is done"),

Review comment:
       done

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByCombiner.java
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByCombiner;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for normal group by operator. In case of map side aggregate, the partially
+// aggregated records are sorted based on group by key. If because of some reasons, like hash
+// table memory exceeded the limit or the first few batches of records have less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as the records
+// are sorted based on group by key.
+public class GroupByCombiner extends VectorGroupByCombiner {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+          org.apache.hadoop.hive.ql.exec.GroupByCombiner.class.getName());
+
+  private transient GenericUDAFEvaluator[] aggregationEvaluators;
+  Deserializer valueDeserializer;
+  GenericUDAFEvaluator.AggregationBuffer[] aggregationBuffers;
+  GroupByOperator groupByOperator;
+  Serializer valueSerializer;
+  ObjectInspector aggrObjectInspector;
+  DataInputBuffer valueBuffer;
+  Object[] cachedValues;
+
+  public GroupByCombiner(TaskContext taskContext) throws HiveException, IOException {
+    super(taskContext);
+    if (rw != null) {
+      try {
+        groupByOperator = (GroupByOperator) rw.getReducer();
+
+        ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+        ois.add(keyObjectInspector);
+        ois.add(valueObjectInspector);
+        ObjectInspector[] rowObjectInspector = new ObjectInspector[1];
+        rowObjectInspector[0] =
+            ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList,
+                        ois);
+        groupByOperator.setInputObjInspectors(rowObjectInspector);
+        groupByOperator.initializeOp(conf);
+        aggregationBuffers = groupByOperator.getAggregationBuffers();
+        aggregationEvaluators = groupByOperator.getAggregationEvaluator();
+
+        TableDesc valueTableDesc = rw.getTagToValueDesc().get(0);
+        if ((aggregationEvaluators == null) || (aggregationEvaluators.length != numValueCol)) {
+          //TODO : Need to support distinct. The logic has to be changed to extract only
+          // those aggregates which are not part of distinct.
+          LOG.info(" Combiner is disabled as the number of value columns does" +
+                  " not match with number of aggregators");
+          numValueCol = 0;
+          rw = null;
+          return;
+        }
+        valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+                .newInstance();
+        valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+        valueDeserializer = (AbstractSerDe) ReflectionUtils.newInstance(
+                valueTableDesc.getDeserializerClass(), null);
+        SerDeUtils.initializeSerDe(valueDeserializer, null,
+                valueTableDesc.getProperties(), null);
+
+        aggrObjectInspector = groupByOperator.getAggrObjInspector();
+        valueBuffer = new DataInputBuffer();
+        cachedValues = new Object[aggregationEvaluators.length];
+      } catch (Exception e) {
+        LOG.error(" GroupByCombiner failed", e);
+        throw new RuntimeException(e.getMessage());
+      }
+    }
+  }
+
+  private void processAggregation(IFile.Writer writer, DataInputBuffer key)
+          throws Exception {
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
+      cachedValues[i] = aggregationEvaluators[i].evaluate(aggregationBuffers[i]);
+    }
+    BytesWritable result = (BytesWritable) valueSerializer.serialize(cachedValues,
+            aggrObjectInspector);
+    valueBuffer.reset(result.getBytes(), result.getLength());
+    writer.append(key, valueBuffer);
+    combineOutputRecordsCounter.increment(1);
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
+      aggregationEvaluators[i].reset(aggregationBuffers[i]);
+    }
+  }
+
+  private void updateAggregation(BytesWritable valWritable, DataInputBuffer value)
+          throws HiveException, SerDeException {
+    valWritable.set(value.getData(), value.getPosition(),
+            value.getLength() - value.getPosition());
+    Object row = valueDeserializer.deserialize(valWritable);
+    groupByOperator.updateAggregation(row);
+  }
+
+  private void processRows(TezRawKeyValueIterator rawIter, IFile.Writer writer) {
+    long numRows = 0;
+    try {
+      DataInputBuffer key = rawIter.getKey();
+      DataInputBuffer prevKey = new DataInputBuffer();
+      prevKey.reset(key.getData(), key.getPosition(), key.getLength() - key.getPosition());
+      BytesWritable valWritable = new BytesWritable();
+      DataInputBuffer prevVal = new DataInputBuffer();
+      prevVal.reset(rawIter.getValue().getData(), rawIter.getValue().getPosition(),
+              rawIter.getValue().getLength() - rawIter.getValue().getPosition());
+      int numSameKey = 0;
+      while (rawIter.next()) {
+        key = rawIter.getKey();
+        if (!VectorGroupByCombiner.compare(key, prevKey)) {
+          // if current key is not equal to the previous key then we have to emit the
+          // record. In case only one record was present for this key, then no need to
+          // do aggregation, We can directly append the key and value. For key with more
+          // than one record, we have to update the aggregation for the current value only
+          // as for previous values (records) aggregation is already done in previous
+          // iteration of loop.
+          if (numSameKey != 0) {
+            updateAggregation(valWritable, prevVal);
+            processAggregation(writer, prevKey);
+          } else {
+            writer.append(prevKey, prevVal);
+          }
+          prevKey.reset(key.getData(), key.getPosition(),
+                  key.getLength() - key.getPosition());
+          numSameKey = 0;
+        } else {
+          // If there are more than one record with same key then update the aggregation.
+          updateAggregation(valWritable, prevVal);
+          numSameKey++;
+        }
+        prevVal.reset(rawIter.getValue().getData(), rawIter.getValue().getPosition(),
+                rawIter.getValue().getLength() - rawIter.getValue().getPosition());
+        numRows++;
+      }
+      if (numSameKey != 0) {
+        updateAggregation(valWritable, prevVal);
+        processAggregation(writer, prevKey);
+      } else {
+        writer.append(prevKey, prevVal);
+      }
+      combineInputRecordsCounter.increment(numRows);
+    } catch(Exception e) {
+      LOG.error("processRows failed", e);
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  @Override
+  public void combine(TezRawKeyValueIterator rawIter, IFile.Writer writer) {
+    try {
+      if (!rawIter.next()) {
+        return;
+      }
+      if (numValueCol == 0) {
+        // For no aggregation, RLE in writer will take care of reduction.
+        appendDirectlyToWriter(rawIter, writer);
+      } else {
+        processRows(rawIter, writer);
+      }
+    } catch (IOException e) {
+      LOG.error("Failed to combine rows", e);
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  public static JobConf setCombinerInConf(BaseWork dest, JobConf conf, JobConf destConf) {
+    //TODO need to change it to a proper config
+    if (conf == null || !conf.get(HiveConf.ConfVars.HIVE_ENABLE_COMBINER_FOR_GROUP_BY.varname).

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] maheshk114 commented on a change in pull request #1736: HIVE-24471 : Add support for combiner in hash mode group aggregation

Posted by GitBox <gi...@apache.org>.
maheshk114 commented on a change in pull request #1736:
URL: https://github.com/apache/hive/pull/1736#discussion_r539843531



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
##########
@@ -712,6 +751,12 @@ private void processKey(Object row,
 
   @Override
   public void process(Object row, int tag) throws HiveException {
+    if (hashAggr) {
+      if (getConfiguration().get("forced.streaming.mode", "false").equals("true")) {

Review comment:
       i have removed it in the next commit ..had added for test only.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] maheshk114 commented on a change in pull request #1736: HIVE-24471 : Add support for combiner in hash mode group aggregation

Posted by GitBox <gi...@apache.org>.
maheshk114 commented on a change in pull request #1736:
URL: https://github.com/apache/hive/pull/1736#discussion_r539842254



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByCombiner.java
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.combine.MRCombiner;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_REDUCER_CLASS;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+import static org.apache.hadoop.hive.serde2.lazy.fast.LazySimpleDeserializeRead.byteArrayCompareRanges;
+
+// Combiner for vectorized group by operator. In case of map side aggregate, the partially
+// aggregated records are sorted based on group by key. If because of some reasons, like hash
+// table memory exceeded the limit or the first few batches of records have less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as the records
+// are sorted based on group by key.
+public class VectorGroupByCombiner extends MRCombiner {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      VectorGroupByCombiner.class.getName());
+  protected final Configuration conf;
+  protected final TezCounter combineInputRecordsCounter;
+  protected final TezCounter combineOutputRecordsCounter;
+  VectorAggregateExpression[] aggregators;
+  VectorAggregationBufferRow aggregationBufferRow;
+  protected transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite;
+
+  // This helper object serializes LazyBinary format reducer values from columns of a row
+  // in a vectorized row batch.
+  protected transient VectorSerializeRow<LazyBinarySerializeWrite> valueVectorSerializeRow;
+
+  // The output buffer used to serialize a value into.
+  protected transient ByteStream.Output valueOutput;
+  DataInputBuffer valueBytesWritable;
+
+  // Only required minimal configs are copied to the worker nodes. This hack (file.) is
+  // done to include these configs to be copied to the worker node.
+  protected static String confPrefixForWorker = "file.";
+
+  VectorDeserializeRow<LazyBinaryDeserializeRead> batchValueDeserializer;
+  int firstValueColumnOffset;
+  VectorizedRowBatchCtx batchContext = null;
+  int numValueCol = 0;
+  protected ReduceWork rw;
+  VectorizedRowBatch outputBatch = null;
+  VectorizedRowBatch inputBatch = null;
+  protected Deserializer inputKeyDeserializer = null;
+  protected ObjectInspector keyObjectInspector = null;
+  protected ObjectInspector valueObjectInspector = null;
+  protected StructObjectInspector valueStructInspectors = null;
+  protected StructObjectInspector keyStructInspector = null;
+
+  public VectorGroupByCombiner(TaskContext taskContext) throws HiveException, IOException {
+    super(taskContext);
+
+    combineInputRecordsCounter =
+            taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+    combineOutputRecordsCounter =
+            taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
+
+    conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload());
+    rw = getReduceWork();
+    if (rw == null) {
+      return;
+    }
+
+    if (rw.getReducer() instanceof VectorGroupByOperator) {
+      VectorGroupByOperator vectorGroupByOperator = (VectorGroupByOperator) rw.getReducer();
+      vectorGroupByOperator.initializeOp(this.conf);
+      this.aggregators = vectorGroupByOperator.getAggregators();
+      this.aggregationBufferRow = allocateAggregationBuffer();
+      batchContext = rw.getVectorizedRowBatchCtx();
+    }
+
+    try {
+      initObjectInspectors(rw.getTagToValueDesc().get(0), rw.getKeyDesc());
+      if (batchContext != null && numValueCol > 0) {
+        initVectorBatches();
+      }
+    } catch (SerDeException e) {
+      LOG.error("Fail to initialize VectorGroupByCombiner.", e);
+      throw new RuntimeException(e.getCause());
+    }
+  }
+
+  // Get the reduce work from the config. Here some hack is used to prefix the config name with
+  // "file." to avoid the config being filtered out.
+  private ReduceWork getReduceWork() {
+    String plan =  conf.get(confPrefixForWorker + HiveConf.ConfVars.PLAN.varname);
+    this.conf.set(HiveConf.ConfVars.PLAN.varname, plan);
+    if (conf.getBoolean(confPrefixForWorker + HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname,
+            true)) {
+      Path planPath = new Path(plan);
+      planPath = new Path(planPath, REDUCE_PLAN_NAME);
+      String planString = conf.get(confPrefixForWorker + planPath.toUri().getPath());
+      this.conf.set(planPath.toUri().getPath(), planString);
+      this.conf.set(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname, "true");
+    } else {
+      this.conf.set(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN.varname, "false");
+    }
+    this.conf.set(HAS_REDUCE_WORK, "true");
+    this.conf.set(MAPRED_REDUCER_CLASS, ExecReducer.class.getName());
+
+    return Utilities.getReduceWork(conf);
+  }
+
+  private void initObjectInspectors(TableDesc valueTableDesc,TableDesc keyTableDesc)
+          throws SerDeException {
+    inputKeyDeserializer =
+            ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), null);
+    SerDeUtils.initializeSerDe(inputKeyDeserializer, null,
+            keyTableDesc.getProperties(), null);
+    keyObjectInspector = inputKeyDeserializer.getObjectInspector();
+
+    keyStructInspector = (StructObjectInspector) keyObjectInspector;
+    firstValueColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
+
+    Deserializer inputValueDeserializer = (AbstractSerDe) ReflectionUtils.newInstance(
+            valueTableDesc.getDeserializerClass(), null);
+    SerDeUtils.initializeSerDe(inputValueDeserializer, null,
+            valueTableDesc.getProperties(), null);
+    valueObjectInspector = inputValueDeserializer.getObjectInspector();
+    valueStructInspectors = (StructObjectInspector) valueObjectInspector;
+    numValueCol = valueStructInspectors.getAllStructFieldRefs().size();
+  }
+
+  void initVectorBatches() throws HiveException {
+    inputBatch = batchContext.createVectorizedRowBatch();
+
+    // Create data buffers for value bytes column vectors.
+    for (int i = firstValueColumnOffset; i < inputBatch.numCols; i++) {
+      ColumnVector colVector = inputBatch.cols[i];
+      if (colVector instanceof BytesColumnVector) {
+        BytesColumnVector bytesColumnVector = (BytesColumnVector) colVector;
+        bytesColumnVector.initBuffer();
+      }
+    }
+
+    batchValueDeserializer =
+            new VectorDeserializeRow<>(
+                    new LazyBinaryDeserializeRead(
+                            VectorizedBatchUtil.typeInfosFromStructObjectInspector(
+                                    valueStructInspectors),
+                            true));
+    batchValueDeserializer.init(firstValueColumnOffset);
+
+    int[] valueColumnMap = new int[numValueCol];
+    for (int i = 0; i < numValueCol; i++) {
+      valueColumnMap[i] = i + firstValueColumnOffset;
+    }
+
+    valueLazyBinarySerializeWrite = new LazyBinarySerializeWrite(numValueCol);
+    valueVectorSerializeRow = new VectorSerializeRow<>(valueLazyBinarySerializeWrite);
+    valueVectorSerializeRow.init(VectorizedBatchUtil.typeInfosFromStructObjectInspector(
+            valueStructInspectors), valueColumnMap);
+    valueOutput = new ByteStream.Output();
+    valueVectorSerializeRow.setOutput(valueOutput);
+    outputBatch = batchContext.createVectorizedRowBatch();
+    valueBytesWritable = new DataInputBuffer();
+  }
+
+  private VectorAggregationBufferRow allocateAggregationBuffer() throws HiveException {
+    VectorAggregateExpression.AggregationBuffer[] aggregationBuffers =
+            new VectorAggregateExpression.AggregationBuffer[aggregators.length];
+    for (int i=0; i < aggregators.length; ++i) {
+      aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer();
+      aggregators[i].reset(aggregationBuffers[i]);
+    }
+    return new VectorAggregationBufferRow(aggregationBuffers);
+  }
+
+  private void finishAggregation(DataInputBuffer key, IFile.Writer writer, boolean needFlush)
+          throws HiveException, IOException {
+    for (int i = 0; i < aggregators.length; ++i) {
+      try {
+        aggregators[i].aggregateInput(aggregationBufferRow.getAggregationBuffer(i), inputBatch);
+      } catch (HiveException e) {
+        throw new RuntimeException(e.getCause());
+      }
+    }
+
+    // In case the input batch is full but the keys are still same we need not flush.
+    // Only evaluate the aggregates and store it in the aggregationBufferRow. The aggregate
+    // functions are incremental and will take care of correctness when next batch comes for
+    // aggregation.
+    if (!needFlush) {
+      return;
+    }
+
+    int colNum = firstValueColumnOffset;
+    for (int i = 0; i < aggregators.length; ++i) {
+      aggregators[i].assignRowColumn(outputBatch, 0, colNum++,
+              aggregationBufferRow.getAggregationBuffer(i));
+    }
+
+    valueLazyBinarySerializeWrite.reset();
+    valueVectorSerializeRow.serializeWrite(outputBatch, 0);
+    valueBytesWritable.reset(valueOutput.getData(), 0, valueOutput.getLength());
+    writer.append(key, valueBytesWritable);
+    combineOutputRecordsCounter.increment(1);
+    aggregationBufferRow.reset();
+    outputBatch.reset();
+  }
+
+  private void addValueToBatch(DataInputBuffer val, DataInputBuffer key,
+                      IFile.Writer writer, boolean needFLush) throws IOException, HiveException {
+    batchValueDeserializer.setBytes(val.getData(), val.getPosition(),
+            val.getLength() - val.getPosition());
+    batchValueDeserializer.deserialize(inputBatch, inputBatch.size);
+    inputBatch.size++;
+    if (needFLush || (inputBatch.size >= VectorizedRowBatch.DEFAULT_SIZE)) {
+      processVectorGroup(key, writer, needFLush);
+    }
+  }
+
+  private void processVectorGroup(DataInputBuffer key, IFile.Writer writer, boolean needFlush)
+          throws HiveException {
+    try {
+      finishAggregation(key, writer, needFlush);
+      inputBatch.reset();
+    } catch (Exception e) {
+      String rowString;
+      try {
+        rowString = inputBatch.toString();
+      } catch (Exception e2) {
+        rowString = "[Error getting row data with exception "
+                + StringUtils.stringifyException(e2) + " ]";
+      }
+      LOG.error("Hive Runtime Error while processing vector batch" + rowString, e);
+      throw new HiveException("Hive Runtime Error while processing vector batch", e);
+    }
+  }
+
+  protected void appendDirectlyToWriter(TezRawKeyValueIterator rawIter, IFile.Writer writer) {
+    long numRows = 0;
+    try {
+      do {
+        numRows++;
+        writer.append(rawIter.getKey(), rawIter.getValue());
+      } while (rawIter.next());
+      combineInputRecordsCounter.increment(numRows);
+      combineOutputRecordsCounter.increment(numRows);
+    } catch(IOException e) {
+      LOG.error("Append to writer failed", e);
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  private void appendToWriter(DataInputBuffer val, DataInputBuffer key, IFile.Writer writer) {
+    try {
+      writer.append(key, val);
+      combineOutputRecordsCounter.increment(1);
+    } catch(IOException e) {
+      LOG.error("Append value list to writer failed", e);
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  public static int compare(DataInputBuffer buf1, DataInputBuffer buf2) {

Review comment:
       make sense. I will check for the equa.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] maheshk114 commented on a change in pull request #1736: HIVE-24471 : Add support for combiner in hash mode group aggregation

Posted by GitBox <gi...@apache.org>.
maheshk114 commented on a change in pull request #1736:
URL: https://github.com/apache/hive/pull/1736#discussion_r539843194



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByCombiner.java
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByCombiner;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for normal group by operator. In case of map side aggregate, the partially
+// aggregated records are sorted based on group by key. If because of some reasons, like hash
+// table memory exceeded the limit or the first few batches of records have less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as the records
+// are sorted based on group by key.
+public class GroupByCombiner extends VectorGroupByCombiner {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+          org.apache.hadoop.hive.ql.exec.GroupByCombiner.class.getName());
+
+  private transient GenericUDAFEvaluator[] aggregationEvaluators;
+  Deserializer valueDeserializer;
+  GenericUDAFEvaluator.AggregationBuffer[] aggregationBuffers;
+  GroupByOperator groupByOperator;
+  Serializer valueSerializer;
+  ObjectInspector aggrObjectInspector;
+  DataInputBuffer valueBuffer;
+  Object[] cachedValues;
+
+  public GroupByCombiner(TaskContext taskContext) throws HiveException, IOException {
+    super(taskContext);
+    if (rw != null) {
+      try {
+        groupByOperator = (GroupByOperator) rw.getReducer();
+
+        ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+        ois.add(keyObjectInspector);
+        ois.add(valueObjectInspector);
+        ObjectInspector[] rowObjectInspector = new ObjectInspector[1];
+        rowObjectInspector[0] =
+            ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList,
+                        ois);
+        groupByOperator.setInputObjInspectors(rowObjectInspector);
+        groupByOperator.initializeOp(conf);
+        aggregationBuffers = groupByOperator.getAggregationBuffers();
+        aggregationEvaluators = groupByOperator.getAggregationEvaluator();
+
+        TableDesc valueTableDesc = rw.getTagToValueDesc().get(0);
+        valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+                .newInstance();
+        valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+        valueDeserializer = (AbstractSerDe) ReflectionUtils.newInstance(
+                valueTableDesc.getDeserializerClass(), null);
+        SerDeUtils.initializeSerDe(valueDeserializer, null,
+                valueTableDesc.getProperties(), null);
+
+        aggrObjectInspector = groupByOperator.getAggrObjInspector();
+        valueBuffer = new DataInputBuffer();
+        cachedValues = new Object[aggregationEvaluators.length];
+      } catch (Exception e) {
+        LOG.error(" GroupByCombiner failed", e);
+        throw new RuntimeException(e.getMessage());
+      }
+    }
+  }
+
+  private void processAggregation(IFile.Writer writer, DataInputBuffer key)
+          throws Exception {
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
+      cachedValues[i] = aggregationEvaluators[i].evaluate(aggregationBuffers[i]);
+    }
+    BytesWritable result = (BytesWritable) valueSerializer.serialize(cachedValues,
+            aggrObjectInspector);
+    valueBuffer.reset(result.getBytes(), result.getLength());
+    writer.append(key, valueBuffer);
+    combineOutputRecordsCounter.increment(1);
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
+      aggregationEvaluators[i].reset(aggregationBuffers[i]);
+    }
+  }
+
+  private void updateAggregation(BytesWritable valWritable, DataInputBuffer value)
+          throws HiveException, SerDeException {
+    valWritable.set(value.getData(), value.getPosition(),
+            value.getLength() - value.getPosition());
+    Object row = valueDeserializer.deserialize(valWritable);
+    groupByOperator.updateAggregation(row);
+  }
+
+  private void processRows(TezRawKeyValueIterator rawIter, IFile.Writer writer) {
+    long numRows = 0;
+    try {
+      DataInputBuffer key = rawIter.getKey();
+      DataInputBuffer prevKey = new DataInputBuffer();
+      prevKey.reset(key.getData(), key.getPosition(), key.getLength() - key.getPosition());
+      BytesWritable valWritable = new BytesWritable();
+      do {
+        key = rawIter.getKey();
+        // For first iteration, prevKey is always same as key.
+        if (VectorGroupByCombiner.compare(key, prevKey) != 0) {
+          processAggregation(writer, prevKey);
+          prevKey.reset(key.getData(), key.getPosition(), key.getLength() - key.getPosition());
+        }
+        updateAggregation(valWritable, rawIter.getValue());
+        numRows++;
+      } while (rawIter.next());

Review comment:
       i have fixed this in the next commit using numValues ==1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pgaref commented on a change in pull request #1736: HIVE-24471 : Add support for combiner in hash mode group aggregation

Posted by GitBox <gi...@apache.org>.
pgaref commented on a change in pull request #1736:
URL: https://github.com/apache/hive/pull/1736#discussion_r544256032



##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -1790,6 +1790,10 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
     HIVEALIAS("hive.alias", "", ""),
     HIVEMAPSIDEAGGREGATE("hive.map.aggr", true, "Whether to use map-side aggregation in Hive Group By queries"),
     HIVEGROUPBYSKEW("hive.groupby.skewindata", false, "Whether there is skew in data to optimize group by queries"),
+
+    HIVE_ENABLE_COMBINER_FOR_GROUP_BY("hive.enable.combiner.for.groupby", true,
+        "Whether to enable tez combiner to aggregate the records after sorting is done"),

Review comment:
       Maybe clarify it is only used for map side aggregation? Any case this would not be beneficial?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByCombiner.java
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByCombiner;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for normal group by operator. In case of map side aggregate, the partially
+// aggregated records are sorted based on group by key. If because of some reasons, like hash
+// table memory exceeded the limit or the first few batches of records have less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as the records
+// are sorted based on group by key.
+public class GroupByCombiner extends VectorGroupByCombiner {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+          org.apache.hadoop.hive.ql.exec.GroupByCombiner.class.getName());
+
+  private transient GenericUDAFEvaluator[] aggregationEvaluators;
+  Deserializer valueDeserializer;
+  GenericUDAFEvaluator.AggregationBuffer[] aggregationBuffers;
+  GroupByOperator groupByOperator;
+  Serializer valueSerializer;
+  ObjectInspector aggrObjectInspector;
+  DataInputBuffer valueBuffer;
+  Object[] cachedValues;
+
+  public GroupByCombiner(TaskContext taskContext) throws HiveException, IOException {
+    super(taskContext);
+    if (rw != null) {
+      try {
+        groupByOperator = (GroupByOperator) rw.getReducer();
+
+        ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+        ois.add(keyObjectInspector);
+        ois.add(valueObjectInspector);
+        ObjectInspector[] rowObjectInspector = new ObjectInspector[1];
+        rowObjectInspector[0] =
+            ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList,
+                        ois);
+        groupByOperator.setInputObjInspectors(rowObjectInspector);
+        groupByOperator.initializeOp(conf);
+        aggregationBuffers = groupByOperator.getAggregationBuffers();
+        aggregationEvaluators = groupByOperator.getAggregationEvaluator();
+
+        TableDesc valueTableDesc = rw.getTagToValueDesc().get(0);
+        if ((aggregationEvaluators == null) || (aggregationEvaluators.length != numValueCol)) {
+          //TODO : Need to support distinct. The logic has to be changed to extract only
+          // those aggregates which are not part of distinct.
+          LOG.info(" Combiner is disabled as the number of value columns does" +
+                  " not match with number of aggregators");
+          numValueCol = 0;
+          rw = null;
+          return;
+        }
+        valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+                .newInstance();
+        valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+        valueDeserializer = (AbstractSerDe) ReflectionUtils.newInstance(
+                valueTableDesc.getDeserializerClass(), null);
+        SerDeUtils.initializeSerDe(valueDeserializer, null,
+                valueTableDesc.getProperties(), null);
+
+        aggrObjectInspector = groupByOperator.getAggrObjInspector();
+        valueBuffer = new DataInputBuffer();
+        cachedValues = new Object[aggregationEvaluators.length];
+      } catch (Exception e) {
+        LOG.error(" GroupByCombiner failed", e);
+        throw new RuntimeException(e.getMessage());
+      }
+    }
+  }
+
+  private void processAggregation(IFile.Writer writer, DataInputBuffer key)
+          throws Exception {
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
+      cachedValues[i] = aggregationEvaluators[i].evaluate(aggregationBuffers[i]);
+    }
+    BytesWritable result = (BytesWritable) valueSerializer.serialize(cachedValues,
+            aggrObjectInspector);
+    valueBuffer.reset(result.getBytes(), result.getLength());
+    writer.append(key, valueBuffer);
+    combineOutputRecordsCounter.increment(1);
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
+      aggregationEvaluators[i].reset(aggregationBuffers[i]);
+    }
+  }
+
+  private void updateAggregation(BytesWritable valWritable, DataInputBuffer value)
+          throws HiveException, SerDeException {
+    valWritable.set(value.getData(), value.getPosition(),
+            value.getLength() - value.getPosition());
+    Object row = valueDeserializer.deserialize(valWritable);
+    groupByOperator.updateAggregation(row);
+  }
+
+  private void processRows(TezRawKeyValueIterator rawIter, IFile.Writer writer) {
+    long numRows = 0;
+    try {
+      DataInputBuffer key = rawIter.getKey();
+      DataInputBuffer prevKey = new DataInputBuffer();
+      prevKey.reset(key.getData(), key.getPosition(), key.getLength() - key.getPosition());
+      BytesWritable valWritable = new BytesWritable();
+      DataInputBuffer prevVal = new DataInputBuffer();
+      prevVal.reset(rawIter.getValue().getData(), rawIter.getValue().getPosition(),
+              rawIter.getValue().getLength() - rawIter.getValue().getPosition());
+      int numSameKey = 0;
+      while (rawIter.next()) {
+        key = rawIter.getKey();
+        if (!VectorGroupByCombiner.compare(key, prevKey)) {
+          // if current key is not equal to the previous key then we have to emit the
+          // record. In case only one record was present for this key, then no need to
+          // do aggregation, We can directly append the key and value. For key with more
+          // than one record, we have to update the aggregation for the current value only
+          // as for previous values (records) aggregation is already done in previous
+          // iteration of loop.
+          if (numSameKey != 0) {
+            updateAggregation(valWritable, prevVal);
+            processAggregation(writer, prevKey);
+          } else {
+            writer.append(prevKey, prevVal);
+          }
+          prevKey.reset(key.getData(), key.getPosition(),
+                  key.getLength() - key.getPosition());
+          numSameKey = 0;
+        } else {
+          // If there are more than one record with same key then update the aggregation.
+          updateAggregation(valWritable, prevVal);
+          numSameKey++;
+        }
+        prevVal.reset(rawIter.getValue().getData(), rawIter.getValue().getPosition(),
+                rawIter.getValue().getLength() - rawIter.getValue().getPosition());
+        numRows++;
+      }
+      if (numSameKey != 0) {
+        updateAggregation(valWritable, prevVal);
+        processAggregation(writer, prevKey);
+      } else {
+        writer.append(prevKey, prevVal);
+      }
+      combineInputRecordsCounter.increment(numRows);
+    } catch(Exception e) {
+      LOG.error("processRows failed", e);
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  @Override
+  public void combine(TezRawKeyValueIterator rawIter, IFile.Writer writer) {
+    try {
+      if (!rawIter.next()) {
+        return;
+      }
+      if (numValueCol == 0) {
+        // For no aggregation, RLE in writer will take care of reduction.
+        appendDirectlyToWriter(rawIter, writer);
+      } else {
+        processRows(rawIter, writer);
+      }
+    } catch (IOException e) {
+      LOG.error("Failed to combine rows", e);
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  public static JobConf setCombinerInConf(BaseWork dest, JobConf conf, JobConf destConf) {
+    //TODO need to change it to a proper config
+    if (conf == null || !conf.get(HiveConf.ConfVars.HIVE_ENABLE_COMBINER_FOR_GROUP_BY.varname).

Review comment:
       HiveConf.getBoolVar ?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByCombiner.java
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByCombiner;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for normal group by operator. In case of map side aggregate, the partially
+// aggregated records are sorted based on group by key. If because of some reasons, like hash
+// table memory exceeded the limit or the first few batches of records have less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as the records
+// are sorted based on group by key.
+public class GroupByCombiner extends VectorGroupByCombiner {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+          org.apache.hadoop.hive.ql.exec.GroupByCombiner.class.getName());
+
+  private transient GenericUDAFEvaluator[] aggregationEvaluators;
+  Deserializer valueDeserializer;
+  GenericUDAFEvaluator.AggregationBuffer[] aggregationBuffers;
+  GroupByOperator groupByOperator;
+  Serializer valueSerializer;
+  ObjectInspector aggrObjectInspector;
+  DataInputBuffer valueBuffer;
+  Object[] cachedValues;
+
+  public GroupByCombiner(TaskContext taskContext) throws HiveException, IOException {
+    super(taskContext);
+    if (rw != null) {
+      try {
+        groupByOperator = (GroupByOperator) rw.getReducer();
+
+        ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+        ois.add(keyObjectInspector);
+        ois.add(valueObjectInspector);
+        ObjectInspector[] rowObjectInspector = new ObjectInspector[1];
+        rowObjectInspector[0] =
+            ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList,
+                        ois);
+        groupByOperator.setInputObjInspectors(rowObjectInspector);
+        groupByOperator.initializeOp(conf);
+        aggregationBuffers = groupByOperator.getAggregationBuffers();
+        aggregationEvaluators = groupByOperator.getAggregationEvaluator();
+
+        TableDesc valueTableDesc = rw.getTagToValueDesc().get(0);
+        if ((aggregationEvaluators == null) || (aggregationEvaluators.length != numValueCol)) {
+          //TODO : Need to support distinct. The logic has to be changed to extract only
+          // those aggregates which are not part of distinct.
+          LOG.info(" Combiner is disabled as the number of value columns does" +
+                  " not match with number of aggregators");
+          numValueCol = 0;
+          rw = null;
+          return;
+        }
+        valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+                .newInstance();
+        valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+        valueDeserializer = (AbstractSerDe) ReflectionUtils.newInstance(
+                valueTableDesc.getDeserializerClass(), null);
+        SerDeUtils.initializeSerDe(valueDeserializer, null,
+                valueTableDesc.getProperties(), null);
+
+        aggrObjectInspector = groupByOperator.getAggrObjInspector();
+        valueBuffer = new DataInputBuffer();
+        cachedValues = new Object[aggregationEvaluators.length];
+      } catch (Exception e) {
+        LOG.error(" GroupByCombiner failed", e);
+        throw new RuntimeException(e.getMessage());
+      }
+    }
+  }
+
+  private void processAggregation(IFile.Writer writer, DataInputBuffer key)
+          throws Exception {
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
+      cachedValues[i] = aggregationEvaluators[i].evaluate(aggregationBuffers[i]);
+    }
+    BytesWritable result = (BytesWritable) valueSerializer.serialize(cachedValues,
+            aggrObjectInspector);
+    valueBuffer.reset(result.getBytes(), result.getLength());
+    writer.append(key, valueBuffer);
+    combineOutputRecordsCounter.increment(1);
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
+      aggregationEvaluators[i].reset(aggregationBuffers[i]);
+    }
+  }
+
+  private void updateAggregation(BytesWritable valWritable, DataInputBuffer value)
+          throws HiveException, SerDeException {
+    valWritable.set(value.getData(), value.getPosition(),
+            value.getLength() - value.getPosition());
+    Object row = valueDeserializer.deserialize(valWritable);
+    groupByOperator.updateAggregation(row);
+  }
+
+  private void processRows(TezRawKeyValueIterator rawIter, IFile.Writer writer) {
+    long numRows = 0;
+    try {
+      DataInputBuffer key = rawIter.getKey();
+      DataInputBuffer prevKey = new DataInputBuffer();

Review comment:
       shall we move prevKey, valWritable, prevVal to class fields to avoid multiple instantiations?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -515,6 +516,10 @@ DAG build(JobConf conf, TezWork tezWork, Path scratchDir, Context ctx,
           Edge e = null;
 
           TezEdgeProperty edgeProp = tezWork.getEdgeProperty(workUnit, v);
+
+          //Add the reducer plan to config to create the combiner object in case of group by.
+          wxConf = GroupByCombiner.setCombinerInConf(v, wxConf, workToConf.get(v));

Review comment:
       Looks like all the combiner creation logic should be a level higher -- would it make sense to have this as part of createVertex method for example? This looks hacky as is

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByCombiner.java
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByCombiner;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for normal group by operator. In case of map side aggregate, the partially
+// aggregated records are sorted based on group by key. If because of some reasons, like hash
+// table memory exceeded the limit or the first few batches of records have less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as the records
+// are sorted based on group by key.
+public class GroupByCombiner extends VectorGroupByCombiner {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+          org.apache.hadoop.hive.ql.exec.GroupByCombiner.class.getName());
+
+  private transient GenericUDAFEvaluator[] aggregationEvaluators;
+  Deserializer valueDeserializer;
+  GenericUDAFEvaluator.AggregationBuffer[] aggregationBuffers;
+  GroupByOperator groupByOperator;
+  Serializer valueSerializer;
+  ObjectInspector aggrObjectInspector;
+  DataInputBuffer valueBuffer;
+  Object[] cachedValues;
+
+  public GroupByCombiner(TaskContext taskContext) throws HiveException, IOException {
+    super(taskContext);
+    if (rw != null) {
+      try {
+        groupByOperator = (GroupByOperator) rw.getReducer();
+
+        ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+        ois.add(keyObjectInspector);
+        ois.add(valueObjectInspector);
+        ObjectInspector[] rowObjectInspector = new ObjectInspector[1];
+        rowObjectInspector[0] =
+            ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList,
+                        ois);
+        groupByOperator.setInputObjInspectors(rowObjectInspector);
+        groupByOperator.initializeOp(conf);
+        aggregationBuffers = groupByOperator.getAggregationBuffers();
+        aggregationEvaluators = groupByOperator.getAggregationEvaluator();
+
+        TableDesc valueTableDesc = rw.getTagToValueDesc().get(0);
+        if ((aggregationEvaluators == null) || (aggregationEvaluators.length != numValueCol)) {
+          //TODO : Need to support distinct. The logic has to be changed to extract only

Review comment:
       shall we open a follow JIRA up for this? what is the challenge here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] github-actions[bot] closed pull request #1736: HIVE-24471 : Add support for combiner in hash mode group aggregation

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #1736:
URL: https://github.com/apache/hive/pull/1736


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] maheshk114 commented on a change in pull request #1736: HIVE-24471 : Add support for combiner in hash mode group aggregation

Posted by GitBox <gi...@apache.org>.
maheshk114 commented on a change in pull request #1736:
URL: https://github.com/apache/hive/pull/1736#discussion_r539841639



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByCombiner.java
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByCombiner;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for normal group by operator. In case of map side aggregate, the partially
+// aggregated records are sorted based on group by key. If because of some reasons, like hash
+// table memory exceeded the limit or the first few batches of records have less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as the records
+// are sorted based on group by key.
+public class GroupByCombiner extends VectorGroupByCombiner {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+          org.apache.hadoop.hive.ql.exec.GroupByCombiner.class.getName());
+
+  private transient GenericUDAFEvaluator[] aggregationEvaluators;
+  Deserializer valueDeserializer;
+  GenericUDAFEvaluator.AggregationBuffer[] aggregationBuffers;
+  GroupByOperator groupByOperator;
+  Serializer valueSerializer;
+  ObjectInspector aggrObjectInspector;
+  DataInputBuffer valueBuffer;
+  Object[] cachedValues;
+
+  public GroupByCombiner(TaskContext taskContext) throws HiveException, IOException {
+    super(taskContext);
+    if (rw != null) {
+      try {
+        groupByOperator = (GroupByOperator) rw.getReducer();
+
+        ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+        ois.add(keyObjectInspector);
+        ois.add(valueObjectInspector);
+        ObjectInspector[] rowObjectInspector = new ObjectInspector[1];
+        rowObjectInspector[0] =
+            ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList,
+                        ois);
+        groupByOperator.setInputObjInspectors(rowObjectInspector);
+        groupByOperator.initializeOp(conf);
+        aggregationBuffers = groupByOperator.getAggregationBuffers();
+        aggregationEvaluators = groupByOperator.getAggregationEvaluator();
+
+        TableDesc valueTableDesc = rw.getTagToValueDesc().get(0);
+        valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+                .newInstance();
+        valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+        valueDeserializer = (AbstractSerDe) ReflectionUtils.newInstance(
+                valueTableDesc.getDeserializerClass(), null);
+        SerDeUtils.initializeSerDe(valueDeserializer, null,
+                valueTableDesc.getProperties(), null);
+
+        aggrObjectInspector = groupByOperator.getAggrObjInspector();
+        valueBuffer = new DataInputBuffer();
+        cachedValues = new Object[aggregationEvaluators.length];
+      } catch (Exception e) {
+        LOG.error(" GroupByCombiner failed", e);
+        throw new RuntimeException(e.getMessage());
+      }
+    }
+  }
+
+  private void processAggregation(IFile.Writer writer, DataInputBuffer key)
+          throws Exception {
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
+      cachedValues[i] = aggregationEvaluators[i].evaluate(aggregationBuffers[i]);
+    }
+    BytesWritable result = (BytesWritable) valueSerializer.serialize(cachedValues,
+            aggrObjectInspector);
+    valueBuffer.reset(result.getBytes(), result.getLength());
+    writer.append(key, valueBuffer);
+    combineOutputRecordsCounter.increment(1);
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
+      aggregationEvaluators[i].reset(aggregationBuffers[i]);
+    }
+  }
+
+  private void updateAggregation(BytesWritable valWritable, DataInputBuffer value)
+          throws HiveException, SerDeException {
+    valWritable.set(value.getData(), value.getPosition(),
+            value.getLength() - value.getPosition());
+    Object row = valueDeserializer.deserialize(valWritable);
+    groupByOperator.updateAggregation(row);
+  }
+
+  private void processRows(TezRawKeyValueIterator rawIter, IFile.Writer writer) {
+    long numRows = 0;
+    try {
+      DataInputBuffer key = rawIter.getKey();
+      DataInputBuffer prevKey = new DataInputBuffer();
+      prevKey.reset(key.getData(), key.getPosition(), key.getLength() - key.getPosition());
+      BytesWritable valWritable = new BytesWritable();
+      do {
+        key = rawIter.getKey();
+        // For first iteration, prevKey is always same as key.
+        if (VectorGroupByCombiner.compare(key, prevKey) != 0) {

Review comment:
       No, it is not iterator per key. I have seen multiple keys in the same iterator.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org