You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2014/10/02 05:20:00 UTC
svn commit: r1628882 -
/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
Author: gopalv
Date: Thu Oct 2 03:20:00 2014
New Revision: 1628882
URL: http://svn.apache.org/r1628882
Log:
HIVE-8296: Reduce vectorization should use independent buffers for key and value (Matt McCline, via Gopal V)
Modified:
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java?rev=1628882&r1=1628881&r2=1628882&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java Thu Oct 2 03:20:00 2014
@@ -86,7 +86,8 @@ public class ReduceRecordSource implemen
List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
- private DataOutputBuffer buffer;
+ private DataOutputBuffer keyBuffer;
+ private DataOutputBuffer valueBuffer;
private VectorizedRowBatchCtx batchContext;
private VectorizedRowBatch batch;
@@ -135,7 +136,8 @@ public class ReduceRecordSource implemen
if(vectorized) {
keyStructInspector = (StructObjectInspector) keyObjectInspector;
keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
- buffer = new DataOutputBuffer();
+ keyBuffer = new DataOutputBuffer();
+ valueBuffer = new DataOutputBuffer();
}
// We should initialize the SerDe with the TypeInfo when available.
@@ -323,11 +325,9 @@ public class ReduceRecordSource implemen
* @return true if it is not done and can take more inputs
*/
private void processVectors(Iterable<Object> values, byte tag) throws HiveException {
- batch.reset();
-
/* deserialize key into columns */
VectorizedBatchUtil.addRowToBatchFrom(keyObject, keyStructInspector,
- 0, 0, batch, buffer);
+ 0, 0, batch, keyBuffer);
for(int i = 0; i < keysColumnOffset; i++) {
VectorizedBatchUtil.setRepeatingColumn(batch, i);
}
@@ -340,18 +340,28 @@ public class ReduceRecordSource implemen
Object valueObj = deserializeValue(valueWritable, tag);
VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors,
- rowIdx, keysColumnOffset, batch, buffer);
+ rowIdx, keysColumnOffset, batch, valueBuffer);
rowIdx++;
if (rowIdx >= BATCH_SIZE) {
VectorizedBatchUtil.setBatchSize(batch, rowIdx);
reducer.processOp(batch, tag);
+
+ // Reset just the value columns and value buffer.
+ for (int i = keysColumnOffset; i < batch.numCols; i++) {
+ batch.cols[i].reset();
+ }
+ valueBuffer.reset();
rowIdx = 0;
}
}
if (rowIdx > 0) {
+ // Flush final partial batch.
VectorizedBatchUtil.setBatchSize(batch, rowIdx);
reducer.processOp(batch, tag);
}
+ batch.reset();
+ keyBuffer.reset();
+ valueBuffer.reset();
} catch (Exception e) {
String rowString = null;
try {