You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/08/02 05:35:17 UTC
[5/5] hive git commit: HIVE-11108: HashTableSinkOperator doesn't
support vectorization [Spark Branch] (Rui reviewed by Xuefu)
HIVE-11108: HashTableSinkOperator doesn't support vectorization [Spark Branch] (Rui reviewed by Xuefu)
Conflicts:
ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a56d9f73
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a56d9f73
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a56d9f73
Branch: refs/heads/branch-1
Commit: a56d9f73930aa77c2c748978b0ef8c5605c25d7b
Parents: 1df842e
Author: Rui Li <ru...@intel.com>
Authored: Tue Jun 30 16:30:33 2015 +0800
Committer: xzhang <xz...@xzdt>
Committed: Sat Aug 1 20:33:27 2015 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/OperatorFactory.java | 3 +
.../ql/exec/SparkHashTableSinkOperator.java | 11 +-
.../VectorSparkHashTableSinkOperator.java | 104 +++++++++++++++++++
.../ql/optimizer/OperatorComparatorFactory.java | 3 +
.../physical/GenSparkSkewJoinProcessor.java | 2 +-
.../hive/ql/optimizer/physical/Vectorizer.java | 21 ++++
.../spark/SparkReduceSinkMapJoinProc.java | 2 +-
.../hive/ql/plan/SparkHashTableSinkDesc.java | 11 ++
.../spark/vector_decimal_mapjoin.q.out | 1 +
.../spark/vector_left_outer_join.q.out | 2 +
.../spark/vectorized_mapjoin.q.out | 1 +
.../spark/vectorized_nested_mapjoin.q.out | 2 +
12 files changed, 153 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
index c4554a7..f58a10b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
@@ -141,6 +142,8 @@ public final class OperatorFactory {
vectorOpvec.add(new OpTuple<FileSinkDesc>(FileSinkDesc.class, VectorFileSinkOperator.class));
vectorOpvec.add(new OpTuple<FilterDesc>(FilterDesc.class, VectorFilterOperator.class));
vectorOpvec.add(new OpTuple<LimitDesc>(LimitDesc.class, VectorLimitOperator.class));
+ vectorOpvec.add(new OpTuple<SparkHashTableSinkDesc>(SparkHashTableSinkDesc.class,
+ VectorSparkHashTableSinkOperator.class));
}
private static final class OpTuple<T extends OperatorDesc> {
http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
index 7c67fd2..aa8808a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
@@ -53,9 +53,6 @@ public class SparkHashTableSinkOperator
private final HashTableSinkOperator htsOperator;
- // The position of this table
- private byte tag;
-
public SparkHashTableSinkOperator() {
htsOperator = new HashTableSinkOperator();
}
@@ -64,6 +61,7 @@ public class SparkHashTableSinkOperator
protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
Collection<Future<?>> result = super.initializeOp(hconf);
ObjectInspector[] inputOIs = new ObjectInspector[conf.getTagLength()];
+ byte tag = conf.getTag();
inputOIs[tag] = inputObjInspectors[0];
conf.setTagOrder(new Byte[]{ tag });
htsOperator.setConf(conf);
@@ -74,13 +72,14 @@ public class SparkHashTableSinkOperator
@Override
public void process(Object row, int tag) throws HiveException {
// Ignore the tag passed in, which should be 0, not what we want
- htsOperator.process(row, this.tag);
+ htsOperator.process(row, conf.getTag());
}
@Override
public void closeOp(boolean abort) throws HiveException {
try {
MapJoinPersistableTableContainer[] mapJoinTables = htsOperator.mapJoinTables;
+ byte tag = conf.getTag();
if (mapJoinTables == null || mapJoinTables.length < tag
|| mapJoinTables[tag] == null) {
LOG.debug("mapJoinTable is null");
@@ -177,10 +176,6 @@ public class SparkHashTableSinkOperator
tableContainer.clear();
}
- public void setTag(byte tag) {
- this.tag = tag;
- }
-
/**
* Implements the getName function for the Node Interface.
*
http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java
new file mode 100644
index 0000000..6b9ac26
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.SparkHashTableSinkOperator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+import java.util.Collection;
+import java.util.concurrent.Future;
+
+/**
+ * Vectorized version of SparkHashTableSinkOperator
+ * Currently the implementation just delegates all the work to super class
+ *
+ * Copied from VectorFileSinkOperator
+ */
+public class VectorSparkHashTableSinkOperator extends SparkHashTableSinkOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ private VectorizationContext vContext;
+
+ // The above members are initialized by the constructor and must not be
+ // transient.
+ //---------------------------------------------------------------------------
+
+ private transient boolean firstBatch;
+
+ private transient VectorExtractRowDynBatch vectorExtractRowDynBatch;
+
+ protected transient Object[] singleRow;
+
+ public VectorSparkHashTableSinkOperator() {
+ }
+
+ public VectorSparkHashTableSinkOperator(VectorizationContext vContext, OperatorDesc conf) {
+ super();
+ this.vContext = vContext;
+ this.conf = (SparkHashTableSinkDesc) conf;
+ }
+
+ @Override
+ protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+ inputObjInspectors[0] =
+ VectorizedBatchUtil.convertToStandardStructObjectInspector((StructObjectInspector) inputObjInspectors[0]);
+
+ Collection<Future<?>> result = super.initializeOp(hconf);
+ assert result.isEmpty();
+
+ firstBatch = true;
+
+ return result;
+ }
+
+ @Override
+ public void process(Object row, int tag) throws HiveException {
+ VectorizedRowBatch batch = (VectorizedRowBatch) row;
+
+ if (firstBatch) {
+ vectorExtractRowDynBatch = new VectorExtractRowDynBatch();
+ vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns());
+
+ singleRow = new Object[vectorExtractRowDynBatch.getCount()];
+
+ firstBatch = false;
+ }
+ vectorExtractRowDynBatch.setBatchOnEntry(batch);
+ if (batch.selectedInUse) {
+ int selected[] = batch.selected;
+ for (int logical = 0 ; logical < batch.size; logical++) {
+ int batchIndex = selected[logical];
+ vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+ super.process(singleRow, tag);
+ }
+ } else {
+ for (int batchIndex = 0 ; batchIndex < batch.size; batchIndex++) {
+ vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+ super.process(singleRow, tag);
+ }
+ }
+
+ vectorExtractRowDynBatch.forgetBatchOnExit();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java
index d93bd72..3518823 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorLimitOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc;
@@ -88,6 +89,8 @@ public class OperatorComparatorFactory {
comparatorMapping.put(SMBMapJoinOperator.class, new SMBMapJoinOperatorComparator());
comparatorMapping.put(LimitOperator.class, new LimitOperatorComparator());
comparatorMapping.put(SparkHashTableSinkOperator.class, new SparkHashTableSinkOperatorComparator());
+ comparatorMapping.put(VectorSparkHashTableSinkOperator.class,
+ new SparkHashTableSinkOperatorComparator());
comparatorMapping.put(LateralViewJoinOperator.class, new LateralViewJoinOperatorComparator());
comparatorMapping.put(VectorGroupByOperator.class, new GroupByOperatorComparator());
comparatorMapping.put(CommonMergeJoinOperator.class, new MapJoinOperatorComparator());
http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
index 7ebd18d..f88fd0a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
@@ -390,7 +390,7 @@ public class GenSparkSkewJoinProcessor {
new ArrayList<Operator<? extends OperatorDesc>>();
tableScanParents.add(tableScan);
hashTableSinkOp.setParentOperators(tableScanParents);
- hashTableSinkOp.setTag(tag);
+ hashTableSinkOp.getConf().setTag(tag);
}
private static void setMemUsage(MapJoinOperator mapJoinOp, Task<? extends Serializable> task,
http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index e7b9c73..ad47547 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -89,6 +89,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
+import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.plan.TezWork;
@@ -920,6 +921,10 @@ public class Vectorizer implements PhysicalPlanResolver {
case EVENT:
ret = true;
break;
+ case HASHTABLESINK:
+ ret = op instanceof SparkHashTableSinkOperator &&
+ validateSparkHashTableSinkOperator((SparkHashTableSinkOperator) op);
+ break;
default:
ret = false;
break;
@@ -962,6 +967,10 @@ public class Vectorizer implements PhysicalPlanResolver {
case EVENT:
ret = true;
break;
+ case HASHTABLESINK:
+ ret = op instanceof SparkHashTableSinkOperator &&
+ validateSparkHashTableSinkOperator((SparkHashTableSinkOperator) op);
+ break;
default:
ret = false;
break;
@@ -1085,6 +1094,17 @@ public class Vectorizer implements PhysicalPlanResolver {
return true;
}
+ private boolean validateSparkHashTableSinkOperator(SparkHashTableSinkOperator op) {
+ SparkHashTableSinkDesc desc = op.getConf();
+ byte tag = desc.getTag();
+ // it's essentially a MapJoinDesc
+ List<ExprNodeDesc> filterExprs = desc.getFilters().get(tag);
+ List<ExprNodeDesc> keyExprs = desc.getKeys().get(tag);
+ List<ExprNodeDesc> valueExprs = desc.getExprs().get(tag);
+ return validateExprNodeDesc(filterExprs, VectorExpressionDescriptor.Mode.FILTER) &&
+ validateExprNodeDesc(keyExprs) && validateExprNodeDesc(valueExprs);
+ }
+
private boolean validateReduceSinkOperator(ReduceSinkOperator op) {
List<ExprNodeDesc> keyDescs = op.getConf().getKeyCols();
List<ExprNodeDesc> partitionDescs = op.getConf().getPartitionCols();
@@ -1671,6 +1691,7 @@ public class Vectorizer implements PhysicalPlanResolver {
case LIMIT:
case EXTRACT:
case EVENT:
+ case HASHTABLESINK:
vectorOp = OperatorFactory.getVectorOperator(op.getConf(), vContext);
break;
default:
http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java
index fd42959..76517e4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java
@@ -283,7 +283,7 @@ public class SparkReduceSinkMapJoinProc implements NodeProcessor {
parent.replaceChild(parentRS, hashTableSinkOp);
}
hashTableSinkOp.setParentOperators(rsParentOps);
- hashTableSinkOp.setTag(tag);
+ hashTableSinkOp.getConf().setTag(tag);
return true;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java
index ff32f5e..8833ae3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java
@@ -26,10 +26,21 @@ package org.apache.hadoop.hive.ql.plan;
public class SparkHashTableSinkDesc extends HashTableSinkDesc {
private static final long serialVersionUID = 1L;
+ // The position of this table
+ private byte tag;
+
public SparkHashTableSinkDesc() {
}
public SparkHashTableSinkDesc(MapJoinDesc clone) {
super(clone);
}
+
+ public byte getTag() {
+ return tag;
+ }
+
+ public void setTag(byte tag) {
+ this.tag = tag;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/test/results/clientpositive/spark/vector_decimal_mapjoin.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/vector_decimal_mapjoin.q.out b/ql/src/test/results/clientpositive/spark/vector_decimal_mapjoin.q.out
index a80a20b..0ab301b 100644
--- a/ql/src/test/results/clientpositive/spark/vector_decimal_mapjoin.q.out
+++ b/ql/src/test/results/clientpositive/spark/vector_decimal_mapjoin.q.out
@@ -102,6 +102,7 @@ STAGE PLANS:
1 dec (type: decimal(6,2))
Local Work:
Map Reduce Local Work
+ Execution mode: vectorized
Stage: Stage-1
Spark
http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/test/results/clientpositive/spark/vector_left_outer_join.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/vector_left_outer_join.q.out b/ql/src/test/results/clientpositive/spark/vector_left_outer_join.q.out
index a0e6c2a..024be1c 100644
--- a/ql/src/test/results/clientpositive/spark/vector_left_outer_join.q.out
+++ b/ql/src/test/results/clientpositive/spark/vector_left_outer_join.q.out
@@ -41,6 +41,7 @@ STAGE PLANS:
1 _col0 (type: int)
Local Work:
Map Reduce Local Work
+ Execution mode: vectorized
Map 4
Map Operator Tree:
TableScan
@@ -56,6 +57,7 @@ STAGE PLANS:
1 _col0 (type: tinyint)
Local Work:
Map Reduce Local Work
+ Execution mode: vectorized
Stage: Stage-1
Spark
http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/test/results/clientpositive/spark/vectorized_mapjoin.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/vectorized_mapjoin.q.out b/ql/src/test/results/clientpositive/spark/vectorized_mapjoin.q.out
index 78b0b37..ecfe42b 100644
--- a/ql/src/test/results/clientpositive/spark/vectorized_mapjoin.q.out
+++ b/ql/src/test/results/clientpositive/spark/vectorized_mapjoin.q.out
@@ -34,6 +34,7 @@ STAGE PLANS:
1 cint (type: int)
Local Work:
Map Reduce Local Work
+ Execution mode: vectorized
Stage: Stage-1
Spark
http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out b/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out
index a25d540..7ba64b7 100644
--- a/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out
+++ b/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out
@@ -34,6 +34,7 @@ STAGE PLANS:
1 _col0 (type: tinyint)
Local Work:
Map Reduce Local Work
+ Execution mode: vectorized
Map 4
Map Operator Tree:
TableScan
@@ -52,6 +53,7 @@ STAGE PLANS:
1 _col0 (type: smallint)
Local Work:
Map Reduce Local Work
+ Execution mode: vectorized
Stage: Stage-1
Spark