You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2014/10/03 20:00:41 UTC

svn commit: r1629273 [3/5] - in /hive/branches/branch-0.14: data/files/ itests/src/test/resources/ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ ql/src/test/queries/clientpositive/ ql/...

Modified: hive/branches/branch-0.14/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/itests/src/test/resources/testconfiguration.properties?rev=1629273&r1=1629272&r2=1629273&view=diff
==============================================================================
--- hive/branches/branch-0.14/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/branches/branch-0.14/itests/src/test/resources/testconfiguration.properties Fri Oct  3 18:00:41 2014
@@ -159,6 +159,7 @@ minitez.query.files.shared=alter_merge_2
   vector_cast_constant.q,\
   vector_char_4.q,\
   vector_char_simple.q,\
+  vector_count_distinct.q,\
   vector_data_types.q,\
   vector_decimal_aggregate.q,\
   vector_distinct_2.q,\

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1629273&r1=1629272&r2=1629273&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Fri Oct  3 18:00:41 2014
@@ -18,98 +18,30 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.PTFTopNHash;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.TopNHash;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
-import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
-// import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
 public class VectorReduceSinkOperator extends ReduceSinkOperator {
 
-  private static final Log LOG = LogFactory.getLog(
-      VectorReduceSinkOperator.class.getName());
-
   private static final long serialVersionUID = 1L;
 
-  /**
-   * The evaluators for the key columns. Key columns decide the sort order on
-   * the reducer side. Key columns are passed to the reducer in the "key".
-   */
-  private VectorExpression[] keyEval;
-
-  /**
-   * The key value writers. These know how to write the necessary writable type
-   * based on key column metadata, from the primitive vector type.
-   */
-  private transient VectorExpressionWriter[] keyWriters;
-
-  /**
-   * The evaluators for the value columns. Value columns are passed to reducer
-   * in the "value".
-   */
-  private VectorExpression[] valueEval;
-
-  /**
-   * The output value writers. These know how to write the necessary writable type
-   * based on value column metadata, from the primitive vector type.
-   */
-  private transient VectorExpressionWriter[] valueWriters;
-
-  /**
-   * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
-   * Hive language). Partition columns decide the reducer that the current row
-   * goes to. Partition columns are not passed to reducer.
-   */
-  private VectorExpression[] partitionEval;
-
-  /**
-  * Evaluators for bucketing columns. This is used to compute bucket number.
-  */
-  private VectorExpression[] bucketEval;
-  private int buckColIdxInKey;
-
-  /**
-   * The partition value writers. These know how to write the necessary writable type
-   * based on partition column metadata, from the primitive vector type.
-   */
-  private transient VectorExpressionWriter[] partitionWriters;
-  private transient VectorExpressionWriter[] bucketWriters = null;
-
-  private static final boolean isDebugEnabled = LOG.isDebugEnabled();
+  // Writer for producing row from input batch.
+  private VectorExpressionWriter[] rowWriters;
+  
+  protected transient Object[] singleRow;
 
   public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf)
       throws HiveException {
     this();
     ReduceSinkDesc desc = (ReduceSinkDesc) conf;
     this.conf = desc;
-    keyEval = vContext.getVectorExpressions(desc.getKeyCols());
-    valueEval = vContext.getVectorExpressions(desc.getValueCols());
-    partitionEval = vContext.getVectorExpressions(desc.getPartitionCols());
-    bucketEval = null;
-    if (desc.getBucketCols() != null && !desc.getBucketCols().isEmpty()) {
-      bucketEval = vContext.getVectorExpressions(desc.getBucketCols());
-      buckColIdxInKey = desc.getPartitionCols().size();
-    }
   }
 
   public VectorReduceSinkOperator() {
@@ -118,406 +50,49 @@ public class VectorReduceSinkOperator ex
 
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
-    try {
-      numDistributionKeys = conf.getNumDistributionKeys();
-      distinctColIndices = conf.getDistinctColumnIndices();
-      numDistinctExprs = distinctColIndices.size();
-
-      TableDesc keyTableDesc = conf.getKeySerializeInfo();
-      keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
-          .newInstance();
-      keySerializer.initialize(null, keyTableDesc.getProperties());
-      keyIsText = keySerializer.getSerializedClass().equals(Text.class);
-
-      /*
-       * Compute and assign the key writers and the key object inspector
-       */
-      VectorExpressionWriterFactory.processVectorExpressions(
-          conf.getKeyCols(),
-          conf.getOutputKeyColumnNames(),
-          new VectorExpressionWriterFactory.SingleOIDClosure() {
-            @Override
-            public void assign(VectorExpressionWriter[] writers,
-              ObjectInspector objectInspector) {
-              keyWriters = writers;
-              keyObjectInspector = objectInspector;
-            }
-          });
-
-      String colNames = "";
-      for(String colName : conf.getOutputKeyColumnNames()) {
-        colNames = String.format("%s %s", colNames, colName);
-      }
-
-      if (isDebugEnabled) {
-        LOG.debug(String.format("keyObjectInspector [%s]%s => %s",
-          keyObjectInspector.getClass(),
-          keyObjectInspector,
-          colNames));
-      }
-
-      partitionWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getPartitionCols());
-      if (conf.getBucketCols() != null && !conf.getBucketCols().isEmpty()) {
-        bucketWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getBucketCols());
-      }
-
-      TableDesc valueTableDesc = conf.getValueSerializeInfo();
-      valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
-          .newInstance();
-      valueSerializer.initialize(null, valueTableDesc.getProperties());
-
-      /*
-       * Compute and assign the value writers and the value object inspector
-       */
-      VectorExpressionWriterFactory.processVectorExpressions(
-          conf.getValueCols(),
-          conf.getOutputValueColumnNames(),
-          new VectorExpressionWriterFactory.SingleOIDClosure() {
-            @Override
-            public void assign(VectorExpressionWriter[] writers,
-                ObjectInspector objectInspector) {
-                valueWriters = writers;
-                valueObjectInspector = objectInspector;
+    // We need a input object inspector that is for the row we will extract out of the
+    // vectorized row batch, not for example, an original inspector for an ORC table, etc.
+    VectorExpressionWriterFactory.processVectorInspector(
+            (StructObjectInspector) inputObjInspectors[0],
+            new VectorExpressionWriterFactory.SingleOIDClosure() {
+              @Override
+              public void assign(VectorExpressionWriter[] writers,
+                  ObjectInspector objectInspector) {
+                rowWriters = writers;
+                inputObjInspectors[0] = objectInspector;
               }
-          });
-
-      if (isDebugEnabled) {
-        colNames = "";
-        for(String colName : conf.getOutputValueColumnNames()) {
-          colNames = String.format("%s %s", colNames, colName);
-        }
-      }
-
-      if (isDebugEnabled) {
-        LOG.debug(String.format("valueObjectInspector [%s]%s => %s",
-            valueObjectInspector.getClass(),
-            valueObjectInspector,
-            colNames));
-      }
+            });
+    singleRow = new Object[rowWriters.length];
 
-      int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1;
-      int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 :
-        numDistributionKeys;
-      cachedKeys = new Object[numKeys][keyLen];
-      cachedValues = new Object[valueEval.length];
-
-      int tag = conf.getTag();
-      tagByte[0] = (byte) tag;
-      LOG.info("Using tag = " + tag);
-
-      int limit = conf.getTopN();
-      float memUsage = conf.getTopNMemoryUsage();
-      if (limit >= 0 && memUsage > 0) {
-        reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash;
-        reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
-      }
-
-      autoParallel = conf.isAutoParallel();
-
-    } catch(Exception e) {
-      throw new HiveException(e);
-    }
+    // Call ReduceSinkOperator with new input inspector.
+    super.initializeOp(hconf);
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
-    VectorizedRowBatch vrg = (VectorizedRowBatch) row;
-
-    if (isDebugEnabled) {
-      LOG.debug(String.format("sinking %d rows, %d values, %d keys, %d parts",
-          vrg.size,
-          valueEval.length,
-          keyEval.length,
-          partitionEval.length));
-    }
-
-    try {
-      // Evaluate the keys
-      for (int i = 0; i < keyEval.length; i++) {
-        keyEval[i].evaluate(vrg);
-      }
-
-      // Determine which rows we need to emit based on topN optimization
-      int startResult = reducerHash.startVectorizedBatch(vrg.size);
-      if (startResult == TopNHash.EXCLUDE) {
-        return; // TopN wants us to exclude all rows.
-      }
-      // TODO: can we do this later/only for the keys that are needed? E.g. update vrg.selected.
-      for (int i = 0; i < partitionEval.length; i++) {
-        partitionEval[i].evaluate(vrg);
-      }
-      if (bucketEval != null) {
-        for (int i = 0; i < bucketEval.length; i++) {
-          bucketEval[i].evaluate(vrg);
-        }
-      }
-      // run the vector evaluations
-      for (int i = 0; i < valueEval.length; i++) {
-         valueEval[i].evaluate(vrg);
-      }
-
-      boolean useTopN = startResult != TopNHash.FORWARD;
-      // Go thru the batch once. If we are not using TopN, we will forward all things and be done.
-      // If we are using topN, we will make the first key for each row and store/forward it.
-      // Values, hashes and additional distinct rows will be handled in the 2nd pass in that case.
-      for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
-        int rowIndex = batchIndex;
-        if (vrg.selectedInUse) {
-          rowIndex = vrg.selected[batchIndex];
-        }
-        // First, make distrib key components for this row and determine distKeyLength.
-        populatedCachedDistributionKeys(vrg, rowIndex, 0);
-
-        // replace bucketing columns with hashcode % numBuckets
-        if (bucketEval != null) {
-          bucketNumber = computeBucketNumber(vrg, rowIndex, conf.getNumBuckets());
-          cachedKeys[0][buckColIdxInKey] = new Text(String.valueOf(bucketNumber));
-        }
-        HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
-        int distKeyLength = firstKey.getDistKeyLength();
-        // Add first distinct expression, if any.
-        if (numDistinctExprs > 0) {
-          populateCachedDistinctKeys(vrg, rowIndex, 0);
-          firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength);
-        }
-
-        final int hashCode;
-
-        // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0]
-        if (autoParallel && partitionEval.length > 0) {
-          hashCode = computeMurmurHash(firstKey);
-        } else {
-          hashCode = computeHashCode(vrg, rowIndex);
-        }
-
-        firstKey.setHashCode(hashCode);
-
-        if (useTopN) {
-          /*
-           * in case of TopN for windowing, we need to distinguish between 
-           * rows with null partition keys and rows with value 0 for partition keys.
-           */
-          boolean partkeysNull = conf.isPTFReduceSink() && partitionKeysAreNull(vrg, rowIndex);
-          reducerHash.tryStoreVectorizedKey(firstKey, partkeysNull, batchIndex);
-        } else {
-          // No TopN, just forward the first key and all others.
-          BytesWritable value = makeValueWritable(vrg, rowIndex);
-          collect(firstKey, value);
-          forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 0);
-        }
-      }
-
-      if (!useTopN) return; // All done.
-
-      // If we use topN, we have called tryStore on every key now. We can process the results.
-      for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
-        int result = reducerHash.getVectorizedBatchResult(batchIndex);
-        if (result == TopNHash.EXCLUDE) continue;
-        int rowIndex = batchIndex;
-        if (vrg.selectedInUse) {
-          rowIndex = vrg.selected[batchIndex];
-        }
-        // Compute value and hashcode - we'd either store or forward them.
-        BytesWritable value = makeValueWritable(vrg, rowIndex);
-        int distKeyLength = -1;
-        int hashCode;
-        if (result == TopNHash.FORWARD) {
-          HiveKey firstKey = reducerHash.getVectorizedKeyToForward(batchIndex);
-          distKeyLength = firstKey.getDistKeyLength();
-          hashCode = firstKey.hashCode();
-          collect(firstKey, value);
-        } else {
-          hashCode = reducerHash.getVectorizedKeyHashCode(batchIndex);
-          reducerHash.storeValue(result, hashCode, value, true);
-          distKeyLength = reducerHash.getVectorizedKeyDistLength(batchIndex);
-        }
-        // Now forward other the rows if there's multi-distinct (but see TODO in forward...).
-        // Unfortunately, that means we will have to rebuild the cachedKeys. Start at 1.
-        if (numDistinctExprs > 1) {
-          populatedCachedDistributionKeys(vrg, rowIndex, 1);
-          forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 1);
-        }
-      }
-    } catch (SerDeException e) {
-      throw new HiveException(e);
-    } catch (IOException e) {
-      throw new HiveException(e);
-    }
-  }
-
-  /**
-   * This function creates and forwards all the additional KVs for the multi-distinct case,
-   * after the first (0th) KV pertaining to the row has already been stored or forwarded.
-   * @param vrg the batch
-   * @param rowIndex the row index in the batch
-   * @param hashCode the partitioning hash code to use; same as for the first KV
-   * @param value the value to use; same as for the first KV
-   * @param distKeyLength the distribution key length of the first key; TODO probably extraneous
-   * @param tag the tag
-   * @param baseIndex the index in cachedKeys where the pre-evaluated distribution keys are stored
-   */
-  private void forwardExtraDistinctRows(VectorizedRowBatch vrg, int rowIndex,int hashCode,
-      BytesWritable value, int distKeyLength, int tag, int baseIndex)
-          throws HiveException, SerDeException, IOException {
-    // TODO: We don't have to forward extra distinct rows immediately (same in non-vector) if
-    //       the first key has already been stored. There's few bytes difference between keys
-    //       for different distincts, and the value/etc. are all the same.
-    //       We could store deltas to re-gen extra rows when flushing TopN.
-    for (int i = 1; i < numDistinctExprs; i++) {
-      if (i != baseIndex) {
-        System.arraycopy(cachedKeys[baseIndex], 0, cachedKeys[i], 0, numDistributionKeys);
-      }
-      populateCachedDistinctKeys(vrg, rowIndex, i);
-      HiveKey hiveKey = toHiveKey(cachedKeys[i], tag, distKeyLength);
-      hiveKey.setHashCode(hashCode);
-      collect(hiveKey, value);
-    }
-  }
-
-  /**
-   * Populate distribution keys part of cachedKeys for a particular row from the batch.
-   * @param vrg the batch
-   * @param rowIndex the row index in the batch
-   * @param index the cachedKeys index to write to
-   */
-  private void populatedCachedDistributionKeys(
-      VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException {
-    for (int i = 0; i < numDistributionKeys; i++) {
-      int batchColumn = keyEval[i].getOutputColumn();
-      ColumnVector vectorColumn = vrg.cols[batchColumn];
-      cachedKeys[index][i] = keyWriters[i].writeValue(vectorColumn, rowIndex);
-    }
-    if (cachedKeys[index].length > numDistributionKeys) {
-      cachedKeys[index][numDistributionKeys] = null;
-    }
-  }
-
-  /**
-   * Populate distinct keys part of cachedKeys for a particular row from the batch.
-   * @param vrg the batch
-   * @param rowIndex the row index in the batch
-   * @param index the cachedKeys index to write to
-   */
-  private void populateCachedDistinctKeys(
-      VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException {
-    StandardUnion union;
-    cachedKeys[index][numDistributionKeys] = union = new StandardUnion(
-        (byte)index, new Object[distinctColIndices.get(index).size()]);
-    Object[] distinctParameters = (Object[]) union.getObject();
-    for (int distinctParamI = 0; distinctParamI < distinctParameters.length; distinctParamI++) {
-      int distinctColIndex = distinctColIndices.get(index).get(distinctParamI);
-      int batchColumn = keyEval[distinctColIndex].getOutputColumn();
-      distinctParameters[distinctParamI] =
-          keyWriters[distinctColIndex].writeValue(vrg.cols[batchColumn], rowIndex);
-    }
-    union.setTag((byte) index);
-  }
+  public void processOp(Object data, int tag) throws HiveException {
+    VectorizedRowBatch vrg = (VectorizedRowBatch) data;
 
-  private BytesWritable makeValueWritable(VectorizedRowBatch vrg, int rowIndex)
-      throws HiveException, SerDeException {
-    int length = valueEval.length;
-
-    // in case of bucketed table, insert the bucket number as the last column in value
-    if (bucketEval != null) {
-      length -= 1;
-      cachedValues[length] = new Text(String.valueOf(bucketNumber));
+    for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
+      Object row = getRowObject(vrg, batchIndex);
+      super.processOp(row, tag);
     }
-
-    for (int i = 0; i < length; i++) {
-      int batchColumn = valueEval[i].getOutputColumn();
-      ColumnVector vectorColumn = vrg.cols[batchColumn];
-      cachedValues[i] = valueWriters[i].writeValue(vectorColumn, rowIndex);
-    }
-    // Serialize the value
-    return (BytesWritable)valueSerializer.serialize(cachedValues, valueObjectInspector);
   }
 
-  private int computeHashCode(VectorizedRowBatch vrg, int rowIndex) throws HiveException {
-    // Evaluate the HashCode
-    int keyHashCode = 0;
-    if (partitionEval.length == 0) {
-      // If no partition cols, just distribute the data uniformly to provide better
-      // load balance. If the requirement is to have a single reducer, we should set
-      // the number of reducers to 1.
-      // Use a constant seed to make the code deterministic.
-      if (random == null) {
-        random = new Random(12345);
-      }
-      keyHashCode = random.nextInt();
-    } else {
-      for (int p = 0; p < partitionEval.length; p++) {
-        ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()];
-        Object partitionValue = partitionWriters[p].writeValue(columnVector, rowIndex);
-        keyHashCode = keyHashCode
-            * 31
-            + ObjectInspectorUtils.hashCode(
-                partitionValue,
-                partitionWriters[p].getObjectInspector());
-      }
-    }
-    return bucketNumber < 0  ? keyHashCode : keyHashCode * 31 + bucketNumber;
-  }
-
-  private boolean partitionKeysAreNull(VectorizedRowBatch vrg, int rowIndex)
+  private Object[] getRowObject(VectorizedRowBatch vrg, int rowIndex)
       throws HiveException {
-    if (partitionEval.length != 0) {
-      for (int p = 0; p < partitionEval.length; p++) {
-        ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()];
-        Object partitionValue = partitionWriters[p].writeValue(columnVector,
-            rowIndex);
-        if (partitionValue != null) {
-          return false;
-        }
+    int batchIndex = rowIndex;
+    if (vrg.selectedInUse) {
+      batchIndex = vrg.selected[rowIndex];
+    }
+    for (int i = 0; i < vrg.projectionSize; i++) {
+      ColumnVector vectorColumn = vrg.cols[vrg.projectedColumns[i]];
+      if (vectorColumn != null) {
+        singleRow[i] = rowWriters[i].writeValue(vectorColumn, batchIndex);
+      } else {
+        // Some columns from tables are not used.
+        singleRow[i] = null;
       }
-      return true;
-    }
-    return false;
-  }
-
-  private int computeBucketNumber(VectorizedRowBatch vrg, int rowIndex, int numBuckets) throws HiveException {
-    int bucketNum = 0;
-    for (int p = 0; p < bucketEval.length; p++) {
-      ColumnVector columnVector = vrg.cols[bucketEval[p].getOutputColumn()];
-      Object bucketValue = bucketWriters[p].writeValue(columnVector, rowIndex);
-      bucketNum = bucketNum
-          * 31
-          + ObjectInspectorUtils.hashCode(
-              bucketValue,
-              bucketWriters[p].getObjectInspector());
-    }
-
-    if (bucketNum < 0) {
-      bucketNum = -1 * bucketNum;
     }
-
-    return bucketNum % numBuckets;
-  }
-
-  static public String getOperatorName() {
-    return "RS";
-  }
-
-  public VectorExpression[] getPartitionEval() {
-    return partitionEval;
-  }
-
-  public void setPartitionEval(VectorExpression[] partitionEval) {
-    this.partitionEval = partitionEval;
-  }
-
-  public VectorExpression[] getValueEval() {
-    return valueEval;
-  }
-
-  public void setValueEval(VectorExpression[] valueEval) {
-    this.valueEval = valueEval;
-  }
-
-  public VectorExpression[] getKeyEval() {
-    return keyEval;
-  }
-
-  public void setKeyEval(VectorExpression[] keyEval) {
-    this.keyEval = keyEval;
+    return singleRow;
   }
 }

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java?rev=1629273&r1=1629272&r2=1629273&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java Fri Oct  3 18:00:41 2014
@@ -1060,6 +1060,31 @@ public final class VectorExpressionWrite
     closure.assign(writers, oids);
   }
 
+  /**
+   * Creates the value writers for an struct object inspector.
+   * Creates an appropriate output object inspector.
+   */
+  public static void processVectorInspector(
+      StructObjectInspector structObjInspector,
+      SingleOIDClosure closure)
+      throws HiveException {
+    List<? extends StructField> fields = structObjInspector.getAllStructFieldRefs();
+    VectorExpressionWriter[] writers = new VectorExpressionWriter[fields.size()];
+    List<ObjectInspector> oids = new ArrayList<ObjectInspector>(writers.length);
+    ArrayList<String> columnNames = new ArrayList<String>();
+    int i = 0;
+    for(StructField field : fields) {
+      ObjectInspector fieldObjInsp = field.getFieldObjectInspector();
+      writers[i] = VectorExpressionWriterFactory.
+                genVectorExpressionWritable(fieldObjInsp);
+      columnNames.add(field.getFieldName());
+      oids.add(writers[i].getObjectInspector());
+      i++;
+    }
+    ObjectInspector objectInspector = ObjectInspectorFactory.
+        getStandardStructObjectInspector(columnNames,oids);
+    closure.assign(writers, objectInspector);
+  }
 
   /**
    * Returns {@link VectorExpressionWriter} objects for the fields in the given

Added: hive/branches/branch-0.14/ql/src/test/queries/clientpositive/vector_count_distinct.q
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/test/queries/clientpositive/vector_count_distinct.q?rev=1629273&view=auto
==============================================================================
--- hive/branches/branch-0.14/ql/src/test/queries/clientpositive/vector_count_distinct.q (added)
+++ hive/branches/branch-0.14/ql/src/test/queries/clientpositive/vector_count_distinct.q Fri Oct  3 18:00:41 2014
@@ -0,0 +1,108 @@
+SET hive.vectorized.execution.enabled=true;
+
+create table web_sales_txt
+(
+    ws_sold_date_sk           int,
+    ws_sold_time_sk           int,
+    ws_ship_date_sk           int,
+    ws_item_sk                int,
+    ws_bill_customer_sk       int,
+    ws_bill_cdemo_sk          int,
+    ws_bill_hdemo_sk          int,
+    ws_bill_addr_sk           int,
+    ws_ship_customer_sk       int,
+    ws_ship_cdemo_sk          int,
+    ws_ship_hdemo_sk          int,
+    ws_ship_addr_sk           int,
+    ws_web_page_sk            int,
+    ws_web_site_sk            int,
+    ws_ship_mode_sk           int,
+    ws_warehouse_sk           int,
+    ws_promo_sk               int,
+    ws_order_number           int,
+    ws_quantity               int,
+    ws_wholesale_cost         decimal(7,2),
+    ws_list_price             decimal(7,2),
+    ws_sales_price            decimal(7,2),
+    ws_ext_discount_amt       decimal(7,2),
+    ws_ext_sales_price        decimal(7,2),
+    ws_ext_wholesale_cost     decimal(7,2),
+    ws_ext_list_price         decimal(7,2),
+    ws_ext_tax                decimal(7,2),
+    ws_coupon_amt             decimal(7,2),
+    ws_ext_ship_cost          decimal(7,2),
+    ws_net_paid               decimal(7,2),
+    ws_net_paid_inc_tax       decimal(7,2),
+    ws_net_paid_inc_ship      decimal(7,2),
+    ws_net_paid_inc_ship_tax  decimal(7,2),
+    ws_net_profit             decimal(7,2)
+)
+row format delimited fields terminated by '|' 
+stored as textfile;
+
+LOAD DATA LOCAL INPATH '../../data/files/web_sales_2k' OVERWRITE INTO TABLE web_sales_txt;
+
+------------------------------------------------------------------------------------------
+
+create table web_sales
+(
+    ws_sold_date_sk           int,
+    ws_sold_time_sk           int,
+    ws_ship_date_sk           int,
+    ws_item_sk                int,
+    ws_bill_customer_sk       int,
+    ws_bill_cdemo_sk          int,
+    ws_bill_hdemo_sk          int,
+    ws_bill_addr_sk           int,
+    ws_ship_customer_sk       int,
+    ws_ship_cdemo_sk          int,
+    ws_ship_hdemo_sk          int,
+    ws_ship_addr_sk           int,
+    ws_web_page_sk            int,
+    ws_ship_mode_sk           int,
+    ws_warehouse_sk           int,
+    ws_promo_sk               int,
+    ws_order_number           int,
+    ws_quantity               int,
+    ws_wholesale_cost         decimal(7,2),
+    ws_list_price             decimal(7,2),
+    ws_sales_price            decimal(7,2),
+    ws_ext_discount_amt       decimal(7,2),
+    ws_ext_sales_price        decimal(7,2),
+    ws_ext_wholesale_cost     decimal(7,2),
+    ws_ext_list_price         decimal(7,2),
+    ws_ext_tax                decimal(7,2),
+    ws_coupon_amt             decimal(7,2),
+    ws_ext_ship_cost          decimal(7,2),
+    ws_net_paid               decimal(7,2),
+    ws_net_paid_inc_tax       decimal(7,2),
+    ws_net_paid_inc_ship      decimal(7,2),
+    ws_net_paid_inc_ship_tax  decimal(7,2),
+    ws_net_profit             decimal(7,2)
+)
+partitioned by
+(
+    ws_web_site_sk            int
+)
+stored as orc
+tblproperties ("orc.stripe.size"="33554432", "orc.compress.size"="16384");
+
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+insert overwrite table web_sales
+partition (ws_web_site_sk)
+select ws_sold_date_sk, ws_sold_time_sk, ws_ship_date_sk, ws_item_sk,
+       ws_bill_customer_sk, ws_bill_cdemo_sk, ws_bill_hdemo_sk, ws_bill_addr_sk,
+       ws_ship_customer_sk, ws_ship_cdemo_sk, ws_ship_hdemo_sk, ws_ship_addr_sk,
+       ws_web_page_sk, ws_ship_mode_sk, ws_warehouse_sk, ws_promo_sk, ws_order_number,
+       ws_quantity, ws_wholesale_cost, ws_list_price, ws_sales_price, ws_ext_discount_amt,
+       ws_ext_sales_price, ws_ext_wholesale_cost, ws_ext_list_price, ws_ext_tax,
+       ws_coupon_amt, ws_ext_ship_cost, ws_net_paid, ws_net_paid_inc_tax, ws_net_paid_inc_ship,
+       ws_net_paid_inc_ship_tax, ws_net_profit, ws_web_site_sk from web_sales_txt;
+
+------------------------------------------------------------------------------------------
+
+explain
+select count(distinct ws_order_number) from web_sales;
+
+select count(distinct ws_order_number) from web_sales;
\ No newline at end of file