You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2014/10/02 05:20:00 UTC

svn commit: r1628882 - /hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java

Author: gopalv
Date: Thu Oct  2 03:20:00 2014
New Revision: 1628882

URL: http://svn.apache.org/r1628882
Log:
HIVE-8296: Reduce vectorization should use independent buffers for key and value (Matt McCline, via Gopal V)

Modified:
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java?rev=1628882&r1=1628881&r2=1628882&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java Thu Oct  2 03:20:00 2014
@@ -86,7 +86,8 @@ public class ReduceRecordSource implemen
 
   List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
 
-  private DataOutputBuffer buffer;
+  private DataOutputBuffer keyBuffer;
+  private DataOutputBuffer valueBuffer;
   private VectorizedRowBatchCtx batchContext;
   private VectorizedRowBatch batch;
 
@@ -135,7 +136,8 @@ public class ReduceRecordSource implemen
       if(vectorized) {
         keyStructInspector = (StructObjectInspector) keyObjectInspector;
         keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
-        buffer = new DataOutputBuffer();
+        keyBuffer = new DataOutputBuffer();
+        valueBuffer = new DataOutputBuffer();
       }
 
       // We should initialize the SerDe with the TypeInfo when available.
@@ -323,11 +325,9 @@ public class ReduceRecordSource implemen
    * @return true if it is not done and can take more inputs
    */
   private void processVectors(Iterable<Object> values, byte tag) throws HiveException {
-    batch.reset();
-
     /* deserialize key into columns */
     VectorizedBatchUtil.addRowToBatchFrom(keyObject, keyStructInspector,
-        0, 0, batch, buffer);
+        0, 0, batch, keyBuffer);
     for(int i = 0; i < keysColumnOffset; i++) {
       VectorizedBatchUtil.setRepeatingColumn(batch, i);
     }
@@ -340,18 +340,28 @@ public class ReduceRecordSource implemen
         Object valueObj = deserializeValue(valueWritable, tag);
 
         VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors,
-            rowIdx, keysColumnOffset, batch, buffer);
+            rowIdx, keysColumnOffset, batch, valueBuffer);
         rowIdx++;
         if (rowIdx >= BATCH_SIZE) {
           VectorizedBatchUtil.setBatchSize(batch, rowIdx);
           reducer.processOp(batch, tag);
+
+          // Reset just the value columns and value buffer.
+          for (int i = keysColumnOffset; i < batch.numCols; i++) {
+            batch.cols[i].reset();
+          }
+          valueBuffer.reset();
           rowIdx = 0;
         }
       }
       if (rowIdx > 0) {
+        // Flush final partial batch.
         VectorizedBatchUtil.setBatchSize(batch, rowIdx);
         reducer.processOp(batch, tag);
       }
+      batch.reset();
+      keyBuffer.reset();
+      valueBuffer.reset();
     } catch (Exception e) {
       String rowString = null;
       try {