You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/08/02 05:35:17 UTC

[5/5] hive git commit: HIVE-11108: HashTableSinkOperator doesn't support vectorization [Spark Branch] (Rui reviewed by Xuefu)

HIVE-11108: HashTableSinkOperator doesn't support vectorization [Spark Branch] (Rui reviewed by Xuefu)

Conflicts:

	ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a56d9f73
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a56d9f73
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a56d9f73

Branch: refs/heads/branch-1
Commit: a56d9f73930aa77c2c748978b0ef8c5605c25d7b
Parents: 1df842e
Author: Rui Li <ru...@intel.com>
Authored: Tue Jun 30 16:30:33 2015 +0800
Committer: xzhang <xz...@xzdt>
Committed: Sat Aug 1 20:33:27 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/OperatorFactory.java    |   3 +
 .../ql/exec/SparkHashTableSinkOperator.java     |  11 +-
 .../VectorSparkHashTableSinkOperator.java       | 104 +++++++++++++++++++
 .../ql/optimizer/OperatorComparatorFactory.java |   3 +
 .../physical/GenSparkSkewJoinProcessor.java     |   2 +-
 .../hive/ql/optimizer/physical/Vectorizer.java  |  21 ++++
 .../spark/SparkReduceSinkMapJoinProc.java       |   2 +-
 .../hive/ql/plan/SparkHashTableSinkDesc.java    |  11 ++
 .../spark/vector_decimal_mapjoin.q.out          |   1 +
 .../spark/vector_left_outer_join.q.out          |   2 +
 .../spark/vectorized_mapjoin.q.out              |   1 +
 .../spark/vectorized_nested_mapjoin.q.out       |   2 +
 12 files changed, 153 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
index c4554a7..f58a10b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
@@ -141,6 +142,8 @@ public final class OperatorFactory {
     vectorOpvec.add(new OpTuple<FileSinkDesc>(FileSinkDesc.class, VectorFileSinkOperator.class));
     vectorOpvec.add(new OpTuple<FilterDesc>(FilterDesc.class, VectorFilterOperator.class));
     vectorOpvec.add(new OpTuple<LimitDesc>(LimitDesc.class, VectorLimitOperator.class));
+    vectorOpvec.add(new OpTuple<SparkHashTableSinkDesc>(SparkHashTableSinkDesc.class,
+        VectorSparkHashTableSinkOperator.class));
   }
 
   private static final class OpTuple<T extends OperatorDesc> {

http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
index 7c67fd2..aa8808a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
@@ -53,9 +53,6 @@ public class SparkHashTableSinkOperator
 
   private final HashTableSinkOperator htsOperator;
 
-  // The position of this table
-  private byte tag;
-
   public SparkHashTableSinkOperator() {
     htsOperator = new HashTableSinkOperator();
   }
@@ -64,6 +61,7 @@ public class SparkHashTableSinkOperator
   protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
     Collection<Future<?>> result = super.initializeOp(hconf);
     ObjectInspector[] inputOIs = new ObjectInspector[conf.getTagLength()];
+    byte tag = conf.getTag();
     inputOIs[tag] = inputObjInspectors[0];
     conf.setTagOrder(new Byte[]{ tag });
     htsOperator.setConf(conf);
@@ -74,13 +72,14 @@ public class SparkHashTableSinkOperator
   @Override
   public void process(Object row, int tag) throws HiveException {
     // Ignore the tag passed in, which should be 0, not what we want
-    htsOperator.process(row, this.tag);
+    htsOperator.process(row, conf.getTag());
   }
 
   @Override
   public void closeOp(boolean abort) throws HiveException {
     try {
       MapJoinPersistableTableContainer[] mapJoinTables = htsOperator.mapJoinTables;
+      byte tag = conf.getTag();
       if (mapJoinTables == null || mapJoinTables.length < tag
           || mapJoinTables[tag] == null) {
         LOG.debug("mapJoinTable is null");
@@ -177,10 +176,6 @@ public class SparkHashTableSinkOperator
     tableContainer.clear();
   }
 
-  public void setTag(byte tag) {
-    this.tag = tag;
-  }
-
   /**
    * Implements the getName function for the Node Interface.
    *

http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java
new file mode 100644
index 0000000..6b9ac26
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.SparkHashTableSinkOperator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+import java.util.Collection;
+import java.util.concurrent.Future;
+
+/**
+ * Vectorized version of SparkHashTableSinkOperator
+ * Currently the implementation just delegates all the work to super class
+ *
+ * Copied from VectorFileSinkOperator
+ */
+public class VectorSparkHashTableSinkOperator extends SparkHashTableSinkOperator {
+
+  private static final long serialVersionUID = 1L;
+
+  private VectorizationContext vContext;
+
+  // The above members are initialized by the constructor and must not be
+  // transient.
+  //---------------------------------------------------------------------------
+
+  private transient boolean firstBatch;
+
+  private transient VectorExtractRowDynBatch vectorExtractRowDynBatch;
+
+  protected transient Object[] singleRow;
+
+  public VectorSparkHashTableSinkOperator() {
+  }
+
+  public VectorSparkHashTableSinkOperator(VectorizationContext vContext, OperatorDesc conf) {
+    super();
+    this.vContext = vContext;
+    this.conf = (SparkHashTableSinkDesc) conf;
+  }
+
+  @Override
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    inputObjInspectors[0] =
+        VectorizedBatchUtil.convertToStandardStructObjectInspector((StructObjectInspector) inputObjInspectors[0]);
+
+    Collection<Future<?>> result = super.initializeOp(hconf);
+    assert result.isEmpty();
+
+    firstBatch = true;
+
+    return result;
+  }
+
+  @Override
+  public void process(Object row, int tag) throws HiveException {
+    VectorizedRowBatch batch = (VectorizedRowBatch) row;
+
+    if (firstBatch) {
+      vectorExtractRowDynBatch = new VectorExtractRowDynBatch();
+      vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns());
+
+      singleRow = new Object[vectorExtractRowDynBatch.getCount()];
+
+      firstBatch = false;
+    }
+    vectorExtractRowDynBatch.setBatchOnEntry(batch);
+    if (batch.selectedInUse) {
+      int selected[] = batch.selected;
+      for (int logical = 0 ; logical < batch.size; logical++) {
+        int batchIndex = selected[logical];
+        vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+        super.process(singleRow, tag);
+      }
+    } else {
+      for (int batchIndex = 0 ; batchIndex < batch.size; batchIndex++) {
+        vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+        super.process(singleRow, tag);
+      }
+    }
+
+    vectorExtractRowDynBatch.forgetBatchOnExit();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java
index d93bd72..3518823 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorLimitOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
@@ -88,6 +89,8 @@ public class OperatorComparatorFactory {
     comparatorMapping.put(SMBMapJoinOperator.class, new SMBMapJoinOperatorComparator());
     comparatorMapping.put(LimitOperator.class, new LimitOperatorComparator());
     comparatorMapping.put(SparkHashTableSinkOperator.class, new SparkHashTableSinkOperatorComparator());
+    comparatorMapping.put(VectorSparkHashTableSinkOperator.class,
+        new SparkHashTableSinkOperatorComparator());
     comparatorMapping.put(LateralViewJoinOperator.class, new LateralViewJoinOperatorComparator());
     comparatorMapping.put(VectorGroupByOperator.class, new GroupByOperatorComparator());
     comparatorMapping.put(CommonMergeJoinOperator.class, new MapJoinOperatorComparator());

http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
index 7ebd18d..f88fd0a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
@@ -390,7 +390,7 @@ public class GenSparkSkewJoinProcessor {
         new ArrayList<Operator<? extends OperatorDesc>>();
     tableScanParents.add(tableScan);
     hashTableSinkOp.setParentOperators(tableScanParents);
-    hashTableSinkOp.setTag(tag);
+    hashTableSinkOp.getConf().setTag(tag);
   }
 
   private static void setMemUsage(MapJoinOperator mapJoinOp, Task<? extends Serializable> task,

http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index e7b9c73..ad47547 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -89,6 +89,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
+import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.plan.TezWork;
@@ -920,6 +921,10 @@ public class Vectorizer implements PhysicalPlanResolver {
       case EVENT:
         ret = true;
         break;
+      case HASHTABLESINK:
+        ret = op instanceof SparkHashTableSinkOperator &&
+            validateSparkHashTableSinkOperator((SparkHashTableSinkOperator) op);
+        break;
       default:
         ret = false;
         break;
@@ -962,6 +967,10 @@ public class Vectorizer implements PhysicalPlanResolver {
       case EVENT:
         ret = true;
         break;
+      case HASHTABLESINK:
+        ret = op instanceof SparkHashTableSinkOperator &&
+            validateSparkHashTableSinkOperator((SparkHashTableSinkOperator) op);
+        break;
       default:
         ret = false;
         break;
@@ -1085,6 +1094,17 @@ public class Vectorizer implements PhysicalPlanResolver {
     return true;
   }
 
+  private boolean validateSparkHashTableSinkOperator(SparkHashTableSinkOperator op) {
+    SparkHashTableSinkDesc desc = op.getConf();
+    byte tag = desc.getTag();
+    // it's essentially a MapJoinDesc
+    List<ExprNodeDesc> filterExprs = desc.getFilters().get(tag);
+    List<ExprNodeDesc> keyExprs = desc.getKeys().get(tag);
+    List<ExprNodeDesc> valueExprs = desc.getExprs().get(tag);
+    return validateExprNodeDesc(filterExprs, VectorExpressionDescriptor.Mode.FILTER) &&
+        validateExprNodeDesc(keyExprs) && validateExprNodeDesc(valueExprs);
+  }
+
   private boolean validateReduceSinkOperator(ReduceSinkOperator op) {
     List<ExprNodeDesc> keyDescs = op.getConf().getKeyCols();
     List<ExprNodeDesc> partitionDescs = op.getConf().getPartitionCols();
@@ -1671,6 +1691,7 @@ public class Vectorizer implements PhysicalPlanResolver {
       case LIMIT:
       case EXTRACT:
       case EVENT:
+      case HASHTABLESINK:
         vectorOp = OperatorFactory.getVectorOperator(op.getConf(), vContext);
         break;
       default:

http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java
index fd42959..76517e4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java
@@ -283,7 +283,7 @@ public class SparkReduceSinkMapJoinProc implements NodeProcessor {
       parent.replaceChild(parentRS, hashTableSinkOp);
     }
     hashTableSinkOp.setParentOperators(rsParentOps);
-    hashTableSinkOp.setTag(tag);
+    hashTableSinkOp.getConf().setTag(tag);
     return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java
index ff32f5e..8833ae3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java
@@ -26,10 +26,21 @@ package org.apache.hadoop.hive.ql.plan;
 public class SparkHashTableSinkDesc extends HashTableSinkDesc {
   private static final long serialVersionUID = 1L;
 
+  // The position of this table
+  private byte tag;
+
   public SparkHashTableSinkDesc() {
   }
 
   public SparkHashTableSinkDesc(MapJoinDesc clone) {
     super(clone);
   }
+
+  public byte getTag() {
+    return tag;
+  }
+
+  public void setTag(byte tag) {
+    this.tag = tag;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/test/results/clientpositive/spark/vector_decimal_mapjoin.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/vector_decimal_mapjoin.q.out b/ql/src/test/results/clientpositive/spark/vector_decimal_mapjoin.q.out
index a80a20b..0ab301b 100644
--- a/ql/src/test/results/clientpositive/spark/vector_decimal_mapjoin.q.out
+++ b/ql/src/test/results/clientpositive/spark/vector_decimal_mapjoin.q.out
@@ -102,6 +102,7 @@ STAGE PLANS:
                         1 dec (type: decimal(6,2))
             Local Work:
               Map Reduce Local Work
+            Execution mode: vectorized
 
   Stage: Stage-1
     Spark

http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/test/results/clientpositive/spark/vector_left_outer_join.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/vector_left_outer_join.q.out b/ql/src/test/results/clientpositive/spark/vector_left_outer_join.q.out
index a0e6c2a..024be1c 100644
--- a/ql/src/test/results/clientpositive/spark/vector_left_outer_join.q.out
+++ b/ql/src/test/results/clientpositive/spark/vector_left_outer_join.q.out
@@ -41,6 +41,7 @@ STAGE PLANS:
                         1 _col0 (type: int)
             Local Work:
               Map Reduce Local Work
+            Execution mode: vectorized
         Map 4 
             Map Operator Tree:
                 TableScan
@@ -56,6 +57,7 @@ STAGE PLANS:
                         1 _col0 (type: tinyint)
             Local Work:
               Map Reduce Local Work
+            Execution mode: vectorized
 
   Stage: Stage-1
     Spark

http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/test/results/clientpositive/spark/vectorized_mapjoin.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/vectorized_mapjoin.q.out b/ql/src/test/results/clientpositive/spark/vectorized_mapjoin.q.out
index 78b0b37..ecfe42b 100644
--- a/ql/src/test/results/clientpositive/spark/vectorized_mapjoin.q.out
+++ b/ql/src/test/results/clientpositive/spark/vectorized_mapjoin.q.out
@@ -34,6 +34,7 @@ STAGE PLANS:
                         1 cint (type: int)
             Local Work:
               Map Reduce Local Work
+            Execution mode: vectorized
 
   Stage: Stage-1
     Spark

http://git-wip-us.apache.org/repos/asf/hive/blob/a56d9f73/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out b/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out
index a25d540..7ba64b7 100644
--- a/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out
+++ b/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out
@@ -34,6 +34,7 @@ STAGE PLANS:
                           1 _col0 (type: tinyint)
             Local Work:
               Map Reduce Local Work
+            Execution mode: vectorized
         Map 4 
             Map Operator Tree:
                 TableScan
@@ -52,6 +53,7 @@ STAGE PLANS:
                           1 _col0 (type: smallint)
             Local Work:
               Map Reduce Local Work
+            Execution mode: vectorized
 
   Stage: Stage-1
     Spark