You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2019/03/07 01:10:54 UTC
[hive] branch master updated: HIVE-21294: Vectorization: 1-reducer
Shuffle can skip the object hash functions (Teddy Choi, reviewed by Gopal V)
This is an automated email from the ASF dual-hosted git repository.
gopalv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 84f766e HIVE-21294: Vectorization: 1-reducer Shuffle can skip the object hash functions (Teddy Choi, reviewed by Gopal V)
84f766e is described below
commit 84f766e79f4d28dbd5b59067c40741fbff297aa7
Author: Teddy Choi <tc...@apache.org>
AuthorDate: Wed Mar 6 17:10:32 2019 -0800
HIVE-21294: Vectorization: 1-reducer Shuffle can skip the object hash functions (Teddy Choi, reviewed by Gopal V)
Signed-off-by: Gopal V <go...@apache.org>
---
.../VectorReduceSinkObjectHashOperator.java | 65 ++++++++++------------
1 file changed, 28 insertions(+), 37 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java
index 767df21..ef5ca02 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java
@@ -64,6 +64,8 @@ public class VectorReduceSinkObjectHashOperator extends VectorReduceSinkCommonOp
protected int[] reduceSinkPartitionColumnMap;
protected TypeInfo[] reduceSinkPartitionTypeInfos;
+ private boolean isSingleReducer;
+
protected VectorExpression[] reduceSinkPartitionExpressions;
// The above members are initialized by the constructor and must not be
@@ -119,6 +121,8 @@ public class VectorReduceSinkObjectHashOperator extends VectorReduceSinkCommonOp
reduceSinkPartitionTypeInfos = vectorReduceSinkInfo.getReduceSinkPartitionTypeInfos();
reduceSinkPartitionExpressions = vectorReduceSinkInfo.getReduceSinkPartitionExpressions();
}
+
+ isSingleReducer = this.conf.getNumReducers() == 1;
}
private ObjectInspector[] getObjectInspectorArray(TypeInfo[] typeInfos) {
@@ -255,48 +259,35 @@ public class VectorReduceSinkObjectHashOperator extends VectorReduceSinkCommonOp
final int size = batch.size;
- if (isEmptyBuckets) { // EmptyBuckets = true
- if (isEmptyPartitions) { // isEmptyPartition = true
- for (int logical = 0; logical< size; logical++) {
- final int batchIndex = (selectedInUse ? selected[logical] : logical);
- final int hashCode = nonPartitionRandom.nextInt();
- postProcess(batch, batchIndex, tag, hashCode);
- }
- } else { // isEmptyPartition = false
- for (int logical = 0; logical< size; logical++) {
- final int batchIndex = (selectedInUse ? selected[logical] : logical);
- partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues);
- final int hashCode = hashFunc.apply(partitionFieldValues, partitionObjectInspectors);
- postProcess(batch, batchIndex, tag, hashCode);
+ for (int logical = 0; logical< size; logical++) {
+ final int batchIndex = (selectedInUse ? selected[logical] : logical);
+ int hashCode;
+ if (isEmptyPartitions) {
+ if (isSingleReducer) {
+ // Empty partition, single reducer -> constant hashCode
+ hashCode = 0;
+ } else {
+ // Empty partition, multiple reducers -> random hashCode
+ hashCode = nonPartitionRandom.nextInt();
}
+ } else {
+ // Compute hashCode from partitions
+ partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues);
+ hashCode = hashFunc.apply(partitionFieldValues, partitionObjectInspectors);
}
- } else { // EmptyBuckets = false
- if (isEmptyPartitions) { // isEmptyPartition = true
- for (int logical = 0; logical< size; logical++) {
- final int batchIndex = (selectedInUse ? selected[logical] : logical);
- bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues);
- final int bucketNum = ObjectInspectorUtils.getBucketNumber(
- hashFunc.apply(bucketFieldValues, bucketObjectInspectors), numBuckets);
- final int hashCode = nonPartitionRandom.nextInt() * 31 + bucketNum;
- if (bucketExpr != null) {
- evaluateBucketExpr(batch, batchIndex, bucketNum);
- }
- postProcess(batch, batchIndex, tag, hashCode);
- }
- } else { // isEmptyPartition = false
- for (int logical = 0; logical< size; logical++) {
- final int batchIndex = (selectedInUse ? selected[logical] : logical);
- partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues);
- bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues);
- final int bucketNum = ObjectInspectorUtils.getBucketNumber(
+
+ // Compute hashCode from buckets
+ if (!isEmptyBuckets) {
+ bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues);
+ final int bucketNum = ObjectInspectorUtils.getBucketNumber(
hashFunc.apply(bucketFieldValues, bucketObjectInspectors), numBuckets);
- final int hashCode = hashFunc.apply(partitionFieldValues, partitionObjectInspectors) * 31 + bucketNum;
- if (bucketExpr != null) {
- evaluateBucketExpr(batch, batchIndex, bucketNum);
- }
- postProcess(batch, batchIndex, tag, hashCode);
+ if (bucketExpr != null) {
+ evaluateBucketExpr(batch, batchIndex, bucketNum);
}
+ hashCode = hashCode * 31 + bucketNum;
}
+
+ postProcess(batch, batchIndex, tag, hashCode);
}
} catch (Exception e) {
throw new HiveException(e);