You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2014/09/29 20:38:46 UTC

svn commit: r1628251 - in /hive/branches/branch-0.14/ql/src: java/org/apache/hadoop/hive/ql/exec/vector/ java/org/apache/hadoop/hive/ql/optimizer/physical/ test/results/clientpositive/tez/

Author: prasanthj
Date: Mon Sep 29 18:38:46 2014
New Revision: 1628251

URL: http://svn.apache.org/r1628251
Log:
HIVE-8226: Vectorize dynamic partitioning in VectorFileSinkOperator (Matt McCline reviewed by Prasanth J)

Modified:
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
    hive/branches/branch-0.14/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java?rev=1628251&r1=1628250&r2=1628251&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java Mon Sep 29 18:38:46 2014
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec.vector;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -45,7 +46,8 @@ public class VectorExtractOperator exten
   private int keyColCount;
   private int valueColCount;
   
-  private transient int [] projectedColumns = null;
+  private transient VectorizedRowBatch outputBatch;
+  private transient int remainingColCount;
 
   public VectorExtractOperator(VectorizationContext vContext, OperatorDesc conf)
       throws HiveException {
@@ -57,26 +59,25 @@ public class VectorExtractOperator exten
     super();
   }
 
-  private StructObjectInspector makeStandardStructObjectInspector(StructObjectInspector structObjectInspector) {
-    List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    StructObjectInspector structInputObjInspector = (StructObjectInspector) inputObjInspectors[0];
+    List<? extends StructField> fields = structInputObjInspector.getAllStructFieldRefs();
     ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
     ArrayList<String> colNames = new ArrayList<String>();
-    for (StructField field: fields) {
-      colNames.add(field.getFieldName());
+    for (int i = keyColCount; i < fields.size(); i++) {
+      StructField field = fields.get(i);
+      String fieldName = field.getFieldName();
+
+      // Remove "VALUE." prefix.
+      int dotIndex = fieldName.indexOf(".");
+      colNames.add(fieldName.substring(dotIndex + 1));
       ois.add(field.getFieldObjectInspector());
     }
-    return ObjectInspectorFactory
+    outputObjInspector = ObjectInspectorFactory
               .getStandardStructObjectInspector(colNames, ois);
-    }
- 
-  @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
-    outputObjInspector = inputObjInspectors[0];
-    LOG.info("VectorExtractOperator class of outputObjInspector is " + outputObjInspector.getClass().getName());
-    projectedColumns = new int [valueColCount];
-    for (int i = 0; i < valueColCount; i++) {
-      projectedColumns[i] = keyColCount + i;
-    }
+    remainingColCount = fields.size() - keyColCount;
+    outputBatch =  new VectorizedRowBatch(remainingColCount);
     initializeChildren(hconf);
   }
 
@@ -86,20 +87,16 @@ public class VectorExtractOperator exten
   }
   
   @Override
-  // Evaluate vectorized batches of rows and forward them.
+  // Remove the key columns and forward the values (and scratch columns).
   public void processOp(Object row, int tag) throws HiveException {
-    VectorizedRowBatch vrg = (VectorizedRowBatch) row;
+    VectorizedRowBatch inputBatch = (VectorizedRowBatch) row;
+
+    // Copy references to the input columns array starting after the keys...
+    for (int i = 0; i < remainingColCount; i++) {
+      outputBatch.cols[i] = inputBatch.cols[keyColCount + i];
+    }
+    outputBatch.size = inputBatch.size;
 
-    // Project away the key columns...
-    int[] originalProjections = vrg.projectedColumns;
-    int originalProjectionSize = vrg.projectionSize;
-    vrg.projectionSize = valueColCount;
-    vrg.projectedColumns = this.projectedColumns;
-
-    forward(vrg, outputObjInspector);
-
-    // Revert the projected columns back, because vrg will be re-used.
-    vrg.projectionSize = originalProjectionSize;
-    vrg.projectedColumns = originalProjections;
+    forward(outputBatch, outputObjInspector);
   }
 }

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1628251&r1=1628250&r2=1628251&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Mon Sep 29 18:38:46 2014
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
-import java.io.IOException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
@@ -27,16 +25,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.ObjectWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 
 /**
  * File Sink operator implementation.
@@ -69,113 +58,10 @@ public class VectorFileSinkOperator exte
 
   @Override
   public void processOp(Object data, int tag) throws HiveException {
-
     VectorizedRowBatch vrg = (VectorizedRowBatch)data;
-
-    Writable [] records = null;
-    boolean vectorizedSerde = false;
-    try {
-      if (serializer instanceof VectorizedSerde) {
-        recordValue = ((VectorizedSerde) serializer).serializeVector(vrg,
-            inputObjInspectors[0]);
-        records = (Writable[]) ((ObjectWritable) recordValue).get();
-        vectorizedSerde = true;
-      }
-    } catch (SerDeException e1) {
-      throw new HiveException(e1);
-    }
-
     for (int i = 0; i < vrg.size; i++) {
-      Writable row = null;
-      if (vectorizedSerde) {
-        row = records[i];
-      } else {
-        if (vrg.valueWriters == null) {
-          vrg.setValueWriters(this.valueWriters);
-        }
-        try {
-          row = serializer.serialize(getRowObject(vrg, i), inputObjInspectors[0]);
-        } catch (SerDeException ex) {
-          throw new HiveException(ex);
-        }
-      }
-    /* Create list bucketing sub-directory only if stored-as-directories is on. */
-    String lbDirName = null;
-    lbDirName = (lbCtx == null) ? null : generateListBucketingDirName(row);
-
-    FSPaths fpaths;
-
-    if (!bDynParts && !filesCreated) {
-      if (lbDirName != null) {
-        FSPaths fsp2 = lookupListBucketingPaths(lbDirName);
-      } else {
-        createBucketFiles(fsp);
-      }
-    }
-
-    try {
-      updateProgress();
-
-      // if DP is enabled, get the final output writers and prepare the real output row
-      assert inputObjInspectors[0].getCategory() == ObjectInspector.Category.STRUCT : "input object inspector is not struct";
-
-      if (bDynParts) {
-        // copy the DP column values from the input row to dpVals
-        dpVals.clear();
-        dpWritables.clear();
-        ObjectInspectorUtils.partialCopyToStandardObject(dpWritables, row, dpStartCol, numDynParts,
-            (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE);
-        // get a set of RecordWriter based on the DP column values
-        // pass the null value along to the escaping process to determine what the dir should be
-        for (Object o : dpWritables) {
-          if (o == null || o.toString().length() == 0) {
-            dpVals.add(dpCtx.getDefaultPartitionName());
-          } else {
-            dpVals.add(o.toString());
-          }
-        }
-        fpaths = getDynOutPaths(dpVals, lbDirName);
-
-      } else {
-        if (lbDirName != null) {
-          fpaths = lookupListBucketingPaths(lbDirName);
-        } else {
-          fpaths = fsp;
-        }
-      }
-
-      rowOutWriters = fpaths.getOutWriters();
-      // check if all record writers implement statistics. if atleast one RW
-      // doesn't implement stats interface we will fallback to conventional way
-      // of gathering stats
-      isCollectRWStats = areAllTrue(statsFromRecordWriter);
-      if (conf.isGatherStats() && !isCollectRWStats) {
-        if (statsCollectRawDataSize) {
-          SerDeStats stats = serializer.getSerDeStats();
-          if (stats != null) {
-            fpaths.getStat().addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
-          }
-        }
-        fpaths.getStat().addToStat(StatsSetupConst.ROW_COUNT, 1);
-      }
-
-
-      if (row_count != null) {
-        row_count.set(row_count.get() + 1);
-      }
-
-      if (!multiFileSpray) {
-        rowOutWriters[0].write(row);
-      } else {
-        int keyHashCode = 0;
-        key.setHashCode(keyHashCode);
-        int bucketNum = prtner.getBucket(key, null, totalFiles);
-        int idx = bucketMap.get(bucketNum);
-        rowOutWriters[idx].write(row);
-      }
-    } catch (IOException e) {
-      throw new HiveException(e);
-    }
+      Object[] row = getRowObject(vrg, i);
+      super.processOp(row, tag);
     }
   }
 
@@ -187,7 +73,7 @@ public class VectorFileSinkOperator exte
     }
     for (int i = 0; i < vrg.projectionSize; i++) {
       ColumnVector vectorColumn = vrg.cols[vrg.projectedColumns[i]];
-      singleRow[i] = vrg.valueWriters[i].writeValue(vectorColumn, batchIndex);
+      singleRow[i] = valueWriters[i].writeValue(vectorColumn, batchIndex);
     }
     return singleRow;
   }

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1628251&r1=1628250&r2=1628251&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Mon Sep 29 18:38:46 2014
@@ -1062,11 +1062,6 @@ public class Vectorizer implements Physi
   }
 
   private boolean validateFileSinkOperator(FileSinkOperator op) {
-    // HIVE-7557: For now, turn off dynamic partitioning to give more time to 
-    // figure out how to make VectorFileSink work correctly with it...
-   if (op.getConf().getDynPartCtx() != null) {
-     return false;
-   }
    return true;
   }
 

Modified: hive/branches/branch-0.14/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out?rev=1628251&r1=1628250&r2=1628251&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out (original)
+++ hive/branches/branch-0.14/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out Mon Sep 29 18:38:46 2014
@@ -214,6 +214,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
                       serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
                       name: default.over1k_part_orc
+            Execution mode: vectorized
 
   Stage: Stage-2
     Dependency Collection
@@ -300,6 +301,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
                       serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
                       name: default.over1k_part_limit_orc
+            Execution mode: vectorized
 
   Stage: Stage-2
     Dependency Collection
@@ -368,6 +370,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
                       serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
                       name: default.over1k_part_buck_orc
+            Execution mode: vectorized
 
   Stage: Stage-2
     Dependency Collection
@@ -435,6 +438,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
                       serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
                       name: default.over1k_part_buck_sort_orc
+            Execution mode: vectorized
 
   Stage: Stage-2
     Dependency Collection
@@ -585,6 +589,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
                       serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
                       name: default.over1k_part_orc
+            Execution mode: vectorized
 
   Stage: Stage-2
     Dependency Collection
@@ -671,6 +676,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
                       serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
                       name: default.over1k_part_limit_orc
+            Execution mode: vectorized
 
   Stage: Stage-2
     Dependency Collection
@@ -739,6 +745,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
                       serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
                       name: default.over1k_part_buck_orc
+            Execution mode: vectorized
 
   Stage: Stage-2
     Dependency Collection
@@ -806,6 +813,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
                       serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
                       name: default.over1k_part_buck_sort_orc
+            Execution mode: vectorized
 
   Stage: Stage-2
     Dependency Collection
@@ -1362,6 +1370,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                       name: default.over1k_part2_orc
+            Execution mode: vectorized
 
   Stage: Stage-2
     Dependency Collection
@@ -1443,6 +1452,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                       name: default.over1k_part2_orc
+            Execution mode: vectorized
 
   Stage: Stage-2
     Dependency Collection
@@ -1531,6 +1541,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                       name: default.over1k_part2_orc
+            Execution mode: vectorized
 
   Stage: Stage-2
     Dependency Collection
@@ -1610,6 +1621,7 @@ STAGE PLANS:
                         output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                         name: default.over1k_part2_orc
+            Execution mode: vectorized
 
   Stage: Stage-2
     Dependency Collection
@@ -1703,6 +1715,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                       name: default.over1k_part2_orc
+            Execution mode: vectorized
 
   Stage: Stage-2
     Dependency Collection
@@ -2097,6 +2110,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                       name: default.over1k_part_buck_sort2_orc
+            Execution mode: vectorized
 
   Stage: Stage-2
     Dependency Collection
@@ -2164,6 +2178,7 @@ STAGE PLANS:
                       output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                       name: default.over1k_part_buck_sort2_orc
+            Execution mode: vectorized
 
   Stage: Stage-2
     Dependency Collection