You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2014/10/03 06:46:50 UTC
svn commit: r1629111 [3/5] - in /hive/trunk: data/files/
itests/src/test/resources/ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/
ql/src/test/queries/clientpositive/ ql/src/test/result...
Modified: hive/trunk/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/trunk/itests/src/test/resources/testconfiguration.properties?rev=1629111&r1=1629110&r2=1629111&view=diff
==============================================================================
--- hive/trunk/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/trunk/itests/src/test/resources/testconfiguration.properties Fri Oct 3 04:46:49 2014
@@ -159,6 +159,7 @@ minitez.query.files.shared=alter_merge_2
vector_cast_constant.q,\
vector_char_4.q,\
vector_char_simple.q,\
+ vector_count_distinct.q,\
vector_data_types.q,\
vector_decimal_aggregate.q,\
vector_distinct_2.q,\
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1629111&r1=1629110&r2=1629111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Fri Oct 3 04:46:49 2014
@@ -18,98 +18,30 @@
package org.apache.hadoop.hive.ql.exec.vector;
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.PTFTopNHash;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.TopNHash;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
-import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
-// import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
public class VectorReduceSinkOperator extends ReduceSinkOperator {
- private static final Log LOG = LogFactory.getLog(
- VectorReduceSinkOperator.class.getName());
-
private static final long serialVersionUID = 1L;
- /**
- * The evaluators for the key columns. Key columns decide the sort order on
- * the reducer side. Key columns are passed to the reducer in the "key".
- */
- private VectorExpression[] keyEval;
-
- /**
- * The key value writers. These know how to write the necessary writable type
- * based on key column metadata, from the primitive vector type.
- */
- private transient VectorExpressionWriter[] keyWriters;
-
- /**
- * The evaluators for the value columns. Value columns are passed to reducer
- * in the "value".
- */
- private VectorExpression[] valueEval;
-
- /**
- * The output value writers. These know how to write the necessary writable type
- * based on value column metadata, from the primitive vector type.
- */
- private transient VectorExpressionWriter[] valueWriters;
-
- /**
- * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
- * Hive language). Partition columns decide the reducer that the current row
- * goes to. Partition columns are not passed to reducer.
- */
- private VectorExpression[] partitionEval;
-
- /**
- * Evaluators for bucketing columns. This is used to compute bucket number.
- */
- private VectorExpression[] bucketEval;
- private int buckColIdxInKey;
-
- /**
- * The partition value writers. These know how to write the necessary writable type
- * based on partition column metadata, from the primitive vector type.
- */
- private transient VectorExpressionWriter[] partitionWriters;
- private transient VectorExpressionWriter[] bucketWriters = null;
-
- private static final boolean isDebugEnabled = LOG.isDebugEnabled();
+ // Writer for producing row from input batch.
+ private VectorExpressionWriter[] rowWriters;
+
+ protected transient Object[] singleRow;
public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf)
throws HiveException {
this();
ReduceSinkDesc desc = (ReduceSinkDesc) conf;
this.conf = desc;
- keyEval = vContext.getVectorExpressions(desc.getKeyCols());
- valueEval = vContext.getVectorExpressions(desc.getValueCols());
- partitionEval = vContext.getVectorExpressions(desc.getPartitionCols());
- bucketEval = null;
- if (desc.getBucketCols() != null && !desc.getBucketCols().isEmpty()) {
- bucketEval = vContext.getVectorExpressions(desc.getBucketCols());
- buckColIdxInKey = desc.getPartitionCols().size();
- }
}
public VectorReduceSinkOperator() {
@@ -118,406 +50,49 @@ public class VectorReduceSinkOperator ex
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
- try {
- numDistributionKeys = conf.getNumDistributionKeys();
- distinctColIndices = conf.getDistinctColumnIndices();
- numDistinctExprs = distinctColIndices.size();
-
- TableDesc keyTableDesc = conf.getKeySerializeInfo();
- keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
- .newInstance();
- keySerializer.initialize(null, keyTableDesc.getProperties());
- keyIsText = keySerializer.getSerializedClass().equals(Text.class);
-
- /*
- * Compute and assign the key writers and the key object inspector
- */
- VectorExpressionWriterFactory.processVectorExpressions(
- conf.getKeyCols(),
- conf.getOutputKeyColumnNames(),
- new VectorExpressionWriterFactory.SingleOIDClosure() {
- @Override
- public void assign(VectorExpressionWriter[] writers,
- ObjectInspector objectInspector) {
- keyWriters = writers;
- keyObjectInspector = objectInspector;
- }
- });
-
- String colNames = "";
- for(String colName : conf.getOutputKeyColumnNames()) {
- colNames = String.format("%s %s", colNames, colName);
- }
-
- if (isDebugEnabled) {
- LOG.debug(String.format("keyObjectInspector [%s]%s => %s",
- keyObjectInspector.getClass(),
- keyObjectInspector,
- colNames));
- }
-
- partitionWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getPartitionCols());
- if (conf.getBucketCols() != null && !conf.getBucketCols().isEmpty()) {
- bucketWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getBucketCols());
- }
-
- TableDesc valueTableDesc = conf.getValueSerializeInfo();
- valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
- .newInstance();
- valueSerializer.initialize(null, valueTableDesc.getProperties());
-
- /*
- * Compute and assign the value writers and the value object inspector
- */
- VectorExpressionWriterFactory.processVectorExpressions(
- conf.getValueCols(),
- conf.getOutputValueColumnNames(),
- new VectorExpressionWriterFactory.SingleOIDClosure() {
- @Override
- public void assign(VectorExpressionWriter[] writers,
- ObjectInspector objectInspector) {
- valueWriters = writers;
- valueObjectInspector = objectInspector;
+ // We need a input object inspector that is for the row we will extract out of the
+ // vectorized row batch, not for example, an original inspector for an ORC table, etc.
+ VectorExpressionWriterFactory.processVectorInspector(
+ (StructObjectInspector) inputObjInspectors[0],
+ new VectorExpressionWriterFactory.SingleOIDClosure() {
+ @Override
+ public void assign(VectorExpressionWriter[] writers,
+ ObjectInspector objectInspector) {
+ rowWriters = writers;
+ inputObjInspectors[0] = objectInspector;
}
- });
-
- if (isDebugEnabled) {
- colNames = "";
- for(String colName : conf.getOutputValueColumnNames()) {
- colNames = String.format("%s %s", colNames, colName);
- }
- }
-
- if (isDebugEnabled) {
- LOG.debug(String.format("valueObjectInspector [%s]%s => %s",
- valueObjectInspector.getClass(),
- valueObjectInspector,
- colNames));
- }
+ });
+ singleRow = new Object[rowWriters.length];
- int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1;
- int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 :
- numDistributionKeys;
- cachedKeys = new Object[numKeys][keyLen];
- cachedValues = new Object[valueEval.length];
-
- int tag = conf.getTag();
- tagByte[0] = (byte) tag;
- LOG.info("Using tag = " + tag);
-
- int limit = conf.getTopN();
- float memUsage = conf.getTopNMemoryUsage();
- if (limit >= 0 && memUsage > 0) {
- reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash;
- reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
- }
-
- autoParallel = conf.isAutoParallel();
-
- } catch(Exception e) {
- throw new HiveException(e);
- }
+ // Call ReduceSinkOperator with new input inspector.
+ super.initializeOp(hconf);
}
@Override
- public void processOp(Object row, int tag) throws HiveException {
- VectorizedRowBatch vrg = (VectorizedRowBatch) row;
-
- if (isDebugEnabled) {
- LOG.debug(String.format("sinking %d rows, %d values, %d keys, %d parts",
- vrg.size,
- valueEval.length,
- keyEval.length,
- partitionEval.length));
- }
-
- try {
- // Evaluate the keys
- for (int i = 0; i < keyEval.length; i++) {
- keyEval[i].evaluate(vrg);
- }
-
- // Determine which rows we need to emit based on topN optimization
- int startResult = reducerHash.startVectorizedBatch(vrg.size);
- if (startResult == TopNHash.EXCLUDE) {
- return; // TopN wants us to exclude all rows.
- }
- // TODO: can we do this later/only for the keys that are needed? E.g. update vrg.selected.
- for (int i = 0; i < partitionEval.length; i++) {
- partitionEval[i].evaluate(vrg);
- }
- if (bucketEval != null) {
- for (int i = 0; i < bucketEval.length; i++) {
- bucketEval[i].evaluate(vrg);
- }
- }
- // run the vector evaluations
- for (int i = 0; i < valueEval.length; i++) {
- valueEval[i].evaluate(vrg);
- }
-
- boolean useTopN = startResult != TopNHash.FORWARD;
- // Go thru the batch once. If we are not using TopN, we will forward all things and be done.
- // If we are using topN, we will make the first key for each row and store/forward it.
- // Values, hashes and additional distinct rows will be handled in the 2nd pass in that case.
- for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
- int rowIndex = batchIndex;
- if (vrg.selectedInUse) {
- rowIndex = vrg.selected[batchIndex];
- }
- // First, make distrib key components for this row and determine distKeyLength.
- populatedCachedDistributionKeys(vrg, rowIndex, 0);
-
- // replace bucketing columns with hashcode % numBuckets
- if (bucketEval != null) {
- bucketNumber = computeBucketNumber(vrg, rowIndex, conf.getNumBuckets());
- cachedKeys[0][buckColIdxInKey] = new Text(String.valueOf(bucketNumber));
- }
- HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
- int distKeyLength = firstKey.getDistKeyLength();
- // Add first distinct expression, if any.
- if (numDistinctExprs > 0) {
- populateCachedDistinctKeys(vrg, rowIndex, 0);
- firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength);
- }
-
- final int hashCode;
-
- // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0]
- if (autoParallel && partitionEval.length > 0) {
- hashCode = computeMurmurHash(firstKey);
- } else {
- hashCode = computeHashCode(vrg, rowIndex);
- }
-
- firstKey.setHashCode(hashCode);
-
- if (useTopN) {
- /*
- * in case of TopN for windowing, we need to distinguish between
- * rows with null partition keys and rows with value 0 for partition keys.
- */
- boolean partkeysNull = conf.isPTFReduceSink() && partitionKeysAreNull(vrg, rowIndex);
- reducerHash.tryStoreVectorizedKey(firstKey, partkeysNull, batchIndex);
- } else {
- // No TopN, just forward the first key and all others.
- BytesWritable value = makeValueWritable(vrg, rowIndex);
- collect(firstKey, value);
- forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 0);
- }
- }
-
- if (!useTopN) return; // All done.
-
- // If we use topN, we have called tryStore on every key now. We can process the results.
- for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
- int result = reducerHash.getVectorizedBatchResult(batchIndex);
- if (result == TopNHash.EXCLUDE) continue;
- int rowIndex = batchIndex;
- if (vrg.selectedInUse) {
- rowIndex = vrg.selected[batchIndex];
- }
- // Compute value and hashcode - we'd either store or forward them.
- BytesWritable value = makeValueWritable(vrg, rowIndex);
- int distKeyLength = -1;
- int hashCode;
- if (result == TopNHash.FORWARD) {
- HiveKey firstKey = reducerHash.getVectorizedKeyToForward(batchIndex);
- distKeyLength = firstKey.getDistKeyLength();
- hashCode = firstKey.hashCode();
- collect(firstKey, value);
- } else {
- hashCode = reducerHash.getVectorizedKeyHashCode(batchIndex);
- reducerHash.storeValue(result, hashCode, value, true);
- distKeyLength = reducerHash.getVectorizedKeyDistLength(batchIndex);
- }
- // Now forward other the rows if there's multi-distinct (but see TODO in forward...).
- // Unfortunately, that means we will have to rebuild the cachedKeys. Start at 1.
- if (numDistinctExprs > 1) {
- populatedCachedDistributionKeys(vrg, rowIndex, 1);
- forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 1);
- }
- }
- } catch (SerDeException e) {
- throw new HiveException(e);
- } catch (IOException e) {
- throw new HiveException(e);
- }
- }
-
- /**
- * This function creates and forwards all the additional KVs for the multi-distinct case,
- * after the first (0th) KV pertaining to the row has already been stored or forwarded.
- * @param vrg the batch
- * @param rowIndex the row index in the batch
- * @param hashCode the partitioning hash code to use; same as for the first KV
- * @param value the value to use; same as for the first KV
- * @param distKeyLength the distribution key length of the first key; TODO probably extraneous
- * @param tag the tag
- * @param baseIndex the index in cachedKeys where the pre-evaluated distribution keys are stored
- */
- private void forwardExtraDistinctRows(VectorizedRowBatch vrg, int rowIndex,int hashCode,
- BytesWritable value, int distKeyLength, int tag, int baseIndex)
- throws HiveException, SerDeException, IOException {
- // TODO: We don't have to forward extra distinct rows immediately (same in non-vector) if
- // the first key has already been stored. There's few bytes difference between keys
- // for different distincts, and the value/etc. are all the same.
- // We could store deltas to re-gen extra rows when flushing TopN.
- for (int i = 1; i < numDistinctExprs; i++) {
- if (i != baseIndex) {
- System.arraycopy(cachedKeys[baseIndex], 0, cachedKeys[i], 0, numDistributionKeys);
- }
- populateCachedDistinctKeys(vrg, rowIndex, i);
- HiveKey hiveKey = toHiveKey(cachedKeys[i], tag, distKeyLength);
- hiveKey.setHashCode(hashCode);
- collect(hiveKey, value);
- }
- }
-
- /**
- * Populate distribution keys part of cachedKeys for a particular row from the batch.
- * @param vrg the batch
- * @param rowIndex the row index in the batch
- * @param index the cachedKeys index to write to
- */
- private void populatedCachedDistributionKeys(
- VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException {
- for (int i = 0; i < numDistributionKeys; i++) {
- int batchColumn = keyEval[i].getOutputColumn();
- ColumnVector vectorColumn = vrg.cols[batchColumn];
- cachedKeys[index][i] = keyWriters[i].writeValue(vectorColumn, rowIndex);
- }
- if (cachedKeys[index].length > numDistributionKeys) {
- cachedKeys[index][numDistributionKeys] = null;
- }
- }
-
- /**
- * Populate distinct keys part of cachedKeys for a particular row from the batch.
- * @param vrg the batch
- * @param rowIndex the row index in the batch
- * @param index the cachedKeys index to write to
- */
- private void populateCachedDistinctKeys(
- VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException {
- StandardUnion union;
- cachedKeys[index][numDistributionKeys] = union = new StandardUnion(
- (byte)index, new Object[distinctColIndices.get(index).size()]);
- Object[] distinctParameters = (Object[]) union.getObject();
- for (int distinctParamI = 0; distinctParamI < distinctParameters.length; distinctParamI++) {
- int distinctColIndex = distinctColIndices.get(index).get(distinctParamI);
- int batchColumn = keyEval[distinctColIndex].getOutputColumn();
- distinctParameters[distinctParamI] =
- keyWriters[distinctColIndex].writeValue(vrg.cols[batchColumn], rowIndex);
- }
- union.setTag((byte) index);
- }
+ public void processOp(Object data, int tag) throws HiveException {
+ VectorizedRowBatch vrg = (VectorizedRowBatch) data;
- private BytesWritable makeValueWritable(VectorizedRowBatch vrg, int rowIndex)
- throws HiveException, SerDeException {
- int length = valueEval.length;
-
- // in case of bucketed table, insert the bucket number as the last column in value
- if (bucketEval != null) {
- length -= 1;
- cachedValues[length] = new Text(String.valueOf(bucketNumber));
+ for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
+ Object row = getRowObject(vrg, batchIndex);
+ super.processOp(row, tag);
}
-
- for (int i = 0; i < length; i++) {
- int batchColumn = valueEval[i].getOutputColumn();
- ColumnVector vectorColumn = vrg.cols[batchColumn];
- cachedValues[i] = valueWriters[i].writeValue(vectorColumn, rowIndex);
- }
- // Serialize the value
- return (BytesWritable)valueSerializer.serialize(cachedValues, valueObjectInspector);
}
- private int computeHashCode(VectorizedRowBatch vrg, int rowIndex) throws HiveException {
- // Evaluate the HashCode
- int keyHashCode = 0;
- if (partitionEval.length == 0) {
- // If no partition cols, just distribute the data uniformly to provide better
- // load balance. If the requirement is to have a single reducer, we should set
- // the number of reducers to 1.
- // Use a constant seed to make the code deterministic.
- if (random == null) {
- random = new Random(12345);
- }
- keyHashCode = random.nextInt();
- } else {
- for (int p = 0; p < partitionEval.length; p++) {
- ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()];
- Object partitionValue = partitionWriters[p].writeValue(columnVector, rowIndex);
- keyHashCode = keyHashCode
- * 31
- + ObjectInspectorUtils.hashCode(
- partitionValue,
- partitionWriters[p].getObjectInspector());
- }
- }
- return bucketNumber < 0 ? keyHashCode : keyHashCode * 31 + bucketNumber;
- }
-
- private boolean partitionKeysAreNull(VectorizedRowBatch vrg, int rowIndex)
+ private Object[] getRowObject(VectorizedRowBatch vrg, int rowIndex)
throws HiveException {
- if (partitionEval.length != 0) {
- for (int p = 0; p < partitionEval.length; p++) {
- ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()];
- Object partitionValue = partitionWriters[p].writeValue(columnVector,
- rowIndex);
- if (partitionValue != null) {
- return false;
- }
+ int batchIndex = rowIndex;
+ if (vrg.selectedInUse) {
+ batchIndex = vrg.selected[rowIndex];
+ }
+ for (int i = 0; i < vrg.projectionSize; i++) {
+ ColumnVector vectorColumn = vrg.cols[vrg.projectedColumns[i]];
+ if (vectorColumn != null) {
+ singleRow[i] = rowWriters[i].writeValue(vectorColumn, batchIndex);
+ } else {
+ // Some columns from tables are not used.
+ singleRow[i] = null;
}
- return true;
- }
- return false;
- }
-
- private int computeBucketNumber(VectorizedRowBatch vrg, int rowIndex, int numBuckets) throws HiveException {
- int bucketNum = 0;
- for (int p = 0; p < bucketEval.length; p++) {
- ColumnVector columnVector = vrg.cols[bucketEval[p].getOutputColumn()];
- Object bucketValue = bucketWriters[p].writeValue(columnVector, rowIndex);
- bucketNum = bucketNum
- * 31
- + ObjectInspectorUtils.hashCode(
- bucketValue,
- bucketWriters[p].getObjectInspector());
- }
-
- if (bucketNum < 0) {
- bucketNum = -1 * bucketNum;
}
-
- return bucketNum % numBuckets;
- }
-
- static public String getOperatorName() {
- return "RS";
- }
-
- public VectorExpression[] getPartitionEval() {
- return partitionEval;
- }
-
- public void setPartitionEval(VectorExpression[] partitionEval) {
- this.partitionEval = partitionEval;
- }
-
- public VectorExpression[] getValueEval() {
- return valueEval;
- }
-
- public void setValueEval(VectorExpression[] valueEval) {
- this.valueEval = valueEval;
- }
-
- public VectorExpression[] getKeyEval() {
- return keyEval;
- }
-
- public void setKeyEval(VectorExpression[] keyEval) {
- this.keyEval = keyEval;
+ return singleRow;
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java?rev=1629111&r1=1629110&r2=1629111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java Fri Oct 3 04:46:49 2014
@@ -1060,6 +1060,31 @@ public final class VectorExpressionWrite
closure.assign(writers, oids);
}
+ /**
+ * Creates the value writers for an struct object inspector.
+ * Creates an appropriate output object inspector.
+ */
+ public static void processVectorInspector(
+ StructObjectInspector structObjInspector,
+ SingleOIDClosure closure)
+ throws HiveException {
+ List<? extends StructField> fields = structObjInspector.getAllStructFieldRefs();
+ VectorExpressionWriter[] writers = new VectorExpressionWriter[fields.size()];
+ List<ObjectInspector> oids = new ArrayList<ObjectInspector>(writers.length);
+ ArrayList<String> columnNames = new ArrayList<String>();
+ int i = 0;
+ for(StructField field : fields) {
+ ObjectInspector fieldObjInsp = field.getFieldObjectInspector();
+ writers[i] = VectorExpressionWriterFactory.
+ genVectorExpressionWritable(fieldObjInsp);
+ columnNames.add(field.getFieldName());
+ oids.add(writers[i].getObjectInspector());
+ i++;
+ }
+ ObjectInspector objectInspector = ObjectInspectorFactory.
+ getStandardStructObjectInspector(columnNames,oids);
+ closure.assign(writers, objectInspector);
+ }
/**
* Returns {@link VectorExpressionWriter} objects for the fields in the given
Added: hive/trunk/ql/src/test/queries/clientpositive/vector_count_distinct.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/vector_count_distinct.q?rev=1629111&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/vector_count_distinct.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/vector_count_distinct.q Fri Oct 3 04:46:49 2014
@@ -0,0 +1,108 @@
+SET hive.vectorized.execution.enabled=true;
+
+create table web_sales_txt
+(
+ ws_sold_date_sk int,
+ ws_sold_time_sk int,
+ ws_ship_date_sk int,
+ ws_item_sk int,
+ ws_bill_customer_sk int,
+ ws_bill_cdemo_sk int,
+ ws_bill_hdemo_sk int,
+ ws_bill_addr_sk int,
+ ws_ship_customer_sk int,
+ ws_ship_cdemo_sk int,
+ ws_ship_hdemo_sk int,
+ ws_ship_addr_sk int,
+ ws_web_page_sk int,
+ ws_web_site_sk int,
+ ws_ship_mode_sk int,
+ ws_warehouse_sk int,
+ ws_promo_sk int,
+ ws_order_number int,
+ ws_quantity int,
+ ws_wholesale_cost decimal(7,2),
+ ws_list_price decimal(7,2),
+ ws_sales_price decimal(7,2),
+ ws_ext_discount_amt decimal(7,2),
+ ws_ext_sales_price decimal(7,2),
+ ws_ext_wholesale_cost decimal(7,2),
+ ws_ext_list_price decimal(7,2),
+ ws_ext_tax decimal(7,2),
+ ws_coupon_amt decimal(7,2),
+ ws_ext_ship_cost decimal(7,2),
+ ws_net_paid decimal(7,2),
+ ws_net_paid_inc_tax decimal(7,2),
+ ws_net_paid_inc_ship decimal(7,2),
+ ws_net_paid_inc_ship_tax decimal(7,2),
+ ws_net_profit decimal(7,2)
+)
+row format delimited fields terminated by '|'
+stored as textfile;
+
+LOAD DATA LOCAL INPATH '../../data/files/web_sales_2k' OVERWRITE INTO TABLE web_sales_txt;
+
+------------------------------------------------------------------------------------------
+
+create table web_sales
+(
+ ws_sold_date_sk int,
+ ws_sold_time_sk int,
+ ws_ship_date_sk int,
+ ws_item_sk int,
+ ws_bill_customer_sk int,
+ ws_bill_cdemo_sk int,
+ ws_bill_hdemo_sk int,
+ ws_bill_addr_sk int,
+ ws_ship_customer_sk int,
+ ws_ship_cdemo_sk int,
+ ws_ship_hdemo_sk int,
+ ws_ship_addr_sk int,
+ ws_web_page_sk int,
+ ws_ship_mode_sk int,
+ ws_warehouse_sk int,
+ ws_promo_sk int,
+ ws_order_number int,
+ ws_quantity int,
+ ws_wholesale_cost decimal(7,2),
+ ws_list_price decimal(7,2),
+ ws_sales_price decimal(7,2),
+ ws_ext_discount_amt decimal(7,2),
+ ws_ext_sales_price decimal(7,2),
+ ws_ext_wholesale_cost decimal(7,2),
+ ws_ext_list_price decimal(7,2),
+ ws_ext_tax decimal(7,2),
+ ws_coupon_amt decimal(7,2),
+ ws_ext_ship_cost decimal(7,2),
+ ws_net_paid decimal(7,2),
+ ws_net_paid_inc_tax decimal(7,2),
+ ws_net_paid_inc_ship decimal(7,2),
+ ws_net_paid_inc_ship_tax decimal(7,2),
+ ws_net_profit decimal(7,2)
+)
+partitioned by
+(
+ ws_web_site_sk int
+)
+stored as orc
+tblproperties ("orc.stripe.size"="33554432", "orc.compress.size"="16384");
+
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+insert overwrite table web_sales
+partition (ws_web_site_sk)
+select ws_sold_date_sk, ws_sold_time_sk, ws_ship_date_sk, ws_item_sk,
+ ws_bill_customer_sk, ws_bill_cdemo_sk, ws_bill_hdemo_sk, ws_bill_addr_sk,
+ ws_ship_customer_sk, ws_ship_cdemo_sk, ws_ship_hdemo_sk, ws_ship_addr_sk,
+ ws_web_page_sk, ws_ship_mode_sk, ws_warehouse_sk, ws_promo_sk, ws_order_number,
+ ws_quantity, ws_wholesale_cost, ws_list_price, ws_sales_price, ws_ext_discount_amt,
+ ws_ext_sales_price, ws_ext_wholesale_cost, ws_ext_list_price, ws_ext_tax,
+ ws_coupon_amt, ws_ext_ship_cost, ws_net_paid, ws_net_paid_inc_tax, ws_net_paid_inc_ship,
+ ws_net_paid_inc_ship_tax, ws_net_profit, ws_web_site_sk from web_sales_txt;
+
+------------------------------------------------------------------------------------------
+
+explain
+select count(distinct ws_order_number) from web_sales;
+
+select count(distinct ws_order_number) from web_sales;
\ No newline at end of file