You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2013/05/30 17:20:01 UTC
svn commit: r1487895 - in /hive/branches/vectorization/ql/src:
java/org/apache/hadoop/hive/ql/exec/vector/
java/org/apache/hadoop/hive/ql/io/orc/
test/org/apache/hadoop/hive/ql/exec/vector/
Author: omalley
Date: Thu May 30 15:20:01 2013
New Revision: 1487895
URL: http://svn.apache.org/r1487895
Log:
HIVE-4603 VectorSelectOperator projections change the index of columns
for subsequent operators (Jitendra Nath Pandey via omalley)
Added:
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
Modified:
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java
hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSelectOperator.java
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java?rev=1487895&r1=1487894&r2=1487895&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java Thu May 30 15:20:01 2013
@@ -49,6 +49,9 @@ public class BytesColumnVector extends C
private byte[] buffer; // optional buffer to use when actually copying in data
private int nextFree; // next free position in buffer
+ // Reusable text object
+ private final Text textObject = new Text();
+
// Estimate that there will be 16 bytes per entry
static final int DEFAULT_BUFFER_SIZE = 16 * VectorizedRowBatch.DEFAULT_SIZE;
@@ -208,8 +211,9 @@ public class BytesColumnVector extends C
}
Writable result = null;
if (!isNull[index] && vector[index] != null) {
- result = new Text();
- ((Text) result).append(vector[index], start[index], length[index]);
+ textObject.clear();
+ textObject.append(vector[index], start[index], length[index]);
+ result = textObject;
} else {
result = NullWritable.get();
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java?rev=1487895&r1=1487894&r2=1487895&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java Thu May 30 15:20:01 2013
@@ -505,6 +505,9 @@ public class VectorMapOperator extends O
case TABLESCAN:
vectorOp = op.clone();
break;
+ case REDUCESINK:
+ vectorOp = new VectorReduceSinkOperator(vectorizationContext, op.getConf());
+ break;
default:
throw new HiveException("Operator: " + op.getName() + ", " +
"not vectorized");
Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1487895&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Thu May 30 15:20:01 2013
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.TerminalOperator;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+public class VectorReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
+ implements Serializable {
+
+ private static final Log LOG = LogFactory.getLog(
+ VectorReduceSinkOperator.class.getName());
+
+ private static final long serialVersionUID = 1L;
+
+ private final VectorizationContext vContext;
+
+ /**
+ * The evaluators for the key columns. Key columns decide the sort order on
+ * the reducer side. Key columns are passed to the reducer in the "key".
+ */
+ protected transient VectorExpression[] keyEval;
+ /**
+ * The evaluators for the value columns. Value columns are passed to reducer
+ * in the "value".
+ */
+ protected transient VectorExpression[] valueEval;
+
+ /**
+ * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
+ * Hive language). Partition columns decide the reducer that the current row
+ * goes to. Partition columns are not passed to reducer.
+ */
+ protected transient VectorExpression[] partitionEval;
+
+ private int numDistributionKeys;
+
+ private List<List<Integer>> distinctColIndices;
+
+ private int numDistinctExprs;
+
+ transient HiveKey keyWritable = new HiveKey();
+ transient Writable value;
+
+ transient Object[] cachedValues;
+ transient Object[][] cachedKeys;
+ transient Random random;
+
+ transient Serializer keySerializer;
+ transient boolean keyIsText;
+ transient Serializer valueSerializer;
+ transient int tag;
+ transient byte[] tagByte = new byte[1];
+
+ transient ObjectInspector keyObjectInspector;
+ transient ObjectInspector valueObjectInspector;
+ transient ObjectInspector[] partitionObjectInspectors;
+ transient int [] keyHashCode = new int [VectorizedRowBatch.DEFAULT_SIZE];
+
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ try {
+ vContext.setOperatorType(OperatorType.REDUCESINK);
+ keyEval = vContext.getVectorExpressions(conf.getKeyCols());
+ valueEval = vContext.getVectorExpressions(conf.getValueCols());
+ partitionEval = vContext.getVectorExpressions(conf.getPartitionCols());
+
+ numDistributionKeys = conf.getNumDistributionKeys();
+ distinctColIndices = conf.getDistinctColumnIndices();
+ numDistinctExprs = distinctColIndices.size();
+
+ TableDesc keyTableDesc = conf.getKeySerializeInfo();
+ keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
+ .newInstance();
+ keySerializer.initialize(null, keyTableDesc.getProperties());
+ keyIsText = keySerializer.getSerializedClass().equals(Text.class);
+
+ keyObjectInspector = vContext.createObjectInspector(keyEval,
+ conf.getOutputKeyColumnNames());
+
+ partitionObjectInspectors = new ObjectInspector[partitionEval.length];
+ for (int i = 0; i < partitionEval.length; i++) {
+ partitionObjectInspectors[i] = vContext.createObjectInspector(partitionEval[i]);
+ }
+
+ String colNames = "";
+ for(String colName : conf.getOutputKeyColumnNames()) {
+ colNames = String.format("%s %s", colNames, colName);
+ }
+
+ LOG.info(String.format("keyObjectInspector [%s]%s => %s",
+ keyObjectInspector.getClass(),
+ keyObjectInspector,
+ colNames));
+
+ conf.getOutputKeyColumnNames();
+ conf.getOutputValueColumnNames();
+
+ //keyObjectInspector = ObjectInspectorFactory.
+
+ TableDesc valueTableDesc = conf.getValueSerializeInfo();
+ valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+ .newInstance();
+ valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+ valueObjectInspector = vContext.createObjectInspector (valueEval,
+ conf.getOutputValueColumnNames());
+
+ colNames = "";
+ for(String colName : conf.getOutputValueColumnNames()) {
+ colNames = String.format("%s %s", colNames, colName);
+ }
+
+ LOG.info(String.format("valueObjectInspector [%s]%s => %s",
+ valueObjectInspector.getClass(),
+ valueObjectInspector,
+ colNames));
+
+ int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1;
+ int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 :
+ numDistributionKeys;
+ cachedKeys = new Object[numKeys][keyLen];
+ cachedValues = new Object[valueEval.length];
+
+ } catch(Exception e) {
+ throw new HiveException(e);
+ }
+ }
+
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
+ VectorizedRowBatch vrg = (VectorizedRowBatch) row;
+
+ LOG.info(String.format("sinking %d rows, %d values, %d keys, %d parts",
+ vrg.size,
+ valueEval.length,
+ keyEval.length,
+ partitionEval.length));
+
+ try {
+
+ for (int i = 0; i < partitionEval.length; i++) {
+ partitionEval[i].evaluate(vrg);
+ }
+
+ // run the vector evaluations
+ for (int i = 0; i < valueEval.length; i++) {
+ valueEval[i].evaluate(vrg);
+ }
+ // Evaluate the keys
+ for (int i = 0; i < keyEval.length; i++) {
+ keyEval[i].evaluate(vrg);
+ }
+
+ Object[] distributionKeys = new Object[numDistributionKeys];
+
+ // Emit a (k,v) pair for each row in the batch
+ //
+ for (int j = 0 ; j < vrg.size; ++j) {
+ int rowIndex = j;
+ if (vrg.selectedInUse) {
+ rowIndex = vrg.selected[j];
+ }
+ for (int i = 0; i < valueEval.length; i++) {
+ int batchColumn = valueEval[i].getOutputColumn();
+ ColumnVector vectorColumn = vrg.cols[batchColumn];
+ cachedValues[i] = vectorColumn.getWritableObject(rowIndex);
+ }
+ // Serialize the value
+ value = valueSerializer.serialize(cachedValues, valueObjectInspector);
+
+ for (int i = 0; i < keyEval.length; i++) {
+ int batchColumn = keyEval[i].getOutputColumn();
+ ColumnVector vectorColumn = vrg.cols[batchColumn];
+ distributionKeys[i] = vectorColumn.getWritableObject(rowIndex);
+ }
+ // no distinct key
+ System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys);
+ // Serialize the keys and append the tag
+ for (int i = 0; i < cachedKeys.length; i++) {
+ if (keyIsText) {
+ Text key = (Text) keySerializer.serialize(cachedKeys[i],
+ keyObjectInspector);
+ if (tag == -1) {
+ keyWritable.set(key.getBytes(), 0, key.getLength());
+ } else {
+ int keyLength = key.getLength();
+ keyWritable.setSize(keyLength + 1);
+ System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
+ keyWritable.get()[keyLength] = tagByte[0];
+ }
+ } else {
+ // Must be BytesWritable
+ BytesWritable key = (BytesWritable) keySerializer.serialize(
+ cachedKeys[i], keyObjectInspector);
+ if (tag == -1) {
+ keyWritable.set(key.getBytes(), 0, key.getLength());
+ } else {
+ int keyLength = key.getLength();
+ keyWritable.setSize(keyLength + 1);
+ System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
+ keyWritable.get()[keyLength] = tagByte[0];
+ }
+ }
+ // Evaluate the HashCode
+ int keyHashCode = 0;
+ if (partitionEval.length == 0) {
+ // If no partition cols, just distribute the data uniformly to provide
+ // better
+ // load balance. If the requirement is to have a single reducer, we
+ // should set
+ // the number of reducers to 1.
+ // Use a constant seed to make the code deterministic.
+ if (random == null) {
+ random = new Random(12345);
+ }
+ keyHashCode = random.nextInt();
+ } else {
+ for (int p = 0; p < partitionEval.length; p++) {
+ keyHashCode = keyHashCode
+ * 31
+ + ObjectInspectorUtils.hashCode(
+ vrg.cols[partitionEval[p].getOutputColumn()].getWritableObject(rowIndex),
+ partitionObjectInspectors[i]);
+ }
+ }
+ keyWritable.setHashCode(keyHashCode);
+ if (out != null) {
+ out.collect(keyWritable, value);
+ // Since this is a terminal operator, update counters explicitly -
+ // forward is not called
+ if (counterNameToEnum != null) {
+ ++outputRows;
+ if (outputRows % 1000 == 0) {
+ incrCounter(numOutputRowsCntr, outputRows);
+ outputRows = 0;
+ }
+ }
+ }
+ }
+ }
+ } catch (SerDeException e) {
+ throw new HiveException(e);
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
+ }
+
+ public VectorReduceSinkOperator (
+ VectorizationContext context,
+ OperatorDesc conf) {
+ this.vContext = context;
+ this.conf = (ReduceSinkDesc) conf;
+ }
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return getOperatorName();
+ }
+
+ static public String getOperatorName() {
+ return "RS";
+ }
+
+ @Override
+ public OperatorType getType() {
+ return OperatorType.REDUCESINK;
+ }
+
+ @Override
+ public boolean opAllowedBeforeMapJoin() {
+ return false;
+ }
+
+}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java?rev=1487895&r1=1487894&r2=1487895&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java Thu May 30 15:20:01 2013
@@ -40,9 +40,10 @@ public class VectorSelectOperator extend
protected transient VectorExpression[] vExpressions;
- VectorizedRowBatch output;
private final VectorizationContext vContext;
+ private int [] projectedColumns = null;
+
public VectorSelectOperator(VectorizationContext ctxt, OperatorDesc conf) {
this.vContext = ctxt;
this.conf = (SelectDesc) conf;
@@ -61,19 +62,19 @@ public class VectorSelectOperator extend
vExpressions = new VectorExpression[colList.size()];
for (int i = 0; i < colList.size(); i++) {
vExpressions[i] = vContext.getVectorExpression(colList.get(i));
+ String columnName = conf.getOutputColumnNames().get(i);
+ // Update column map with output column names
+ vContext.addToColumnMap(columnName, vExpressions[i].getOutputColumn());
}
- output = new VectorizedRowBatch(colList.size(),
- VectorizedRowBatch.DEFAULT_SIZE);
initializeChildren(hconf);
+ projectedColumns = new int [vExpressions.length];
+ for (int i = 0; i < projectedColumns.length; i++) {
+ projectedColumns[i] = vExpressions[i].getOutputColumn();
+ }
}
public void setSelectExpressions(VectorExpression[] exprs) {
this.vExpressions = exprs;
- output = new VectorizedRowBatch(exprs.length, VectorizedRowBatch.DEFAULT_SIZE);
- }
-
- public VectorizedRowBatch getOutput() {
- return output;
}
@Override
@@ -95,15 +96,18 @@ public class VectorSelectOperator extend
}
}
- //Prepare output, shallow vector copy
- output.selectedInUse = vrg.selectedInUse;
- output.selected = vrg.selected;
- output.size = vrg.size;
+ //Prepare output, set the projections
+ int[] originalProjections = vrg.projectedColumns;
+ int originalProjectionSize = vrg.projectionSize;
+ vrg.projectionSize = vExpressions.length;
for (int i = 0; i < vExpressions.length; i++) {
- output.cols[i] = vrg.cols[vExpressions[i].getOutputColumn()];
+ vrg.projectedColumns[i] = vExpressions[i].getOutputColumn();
}
- output.numCols = vExpressions.length;
- forward(output, outputObjInspector);
+ forward(vrg, outputObjInspector);
+
+ // Revert the projected columns back, because vrg will be re-used.
+ vrg.projectionSize = originalProjectionSize;
+ vrg.projectedColumns = originalProjections;
}
/**
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1487895&r1=1487894&r2=1487895&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Thu May 30 15:20:01 2013
@@ -88,6 +88,7 @@ import org.apache.hadoop.hive.ql.udf.gen
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
/**
@@ -192,7 +193,6 @@ public class VectorizationContext {
private VectorExpression getVectorExpression(ExprNodeColumnDesc
exprDesc) {
-
int columnNum = getInputColumnIndex(exprDesc.getColumn());
VectorExpression expr = null;
switch (opType) {
@@ -1074,7 +1074,8 @@ public class VectorizationContext {
String columnType = vectorExpression.getOutputType();
if (columnType.equalsIgnoreCase("long") ||
columnType.equalsIgnoreCase("bigint") ||
- columnType.equalsIgnoreCase("int")) {
+ columnType.equalsIgnoreCase("int") ||
+ columnType.equalsIgnoreCase("smallint")) {
return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
} else if (columnType.equalsIgnoreCase("double")) {
return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
@@ -1084,5 +1085,23 @@ public class VectorizationContext {
throw new HiveException(String.format("Must implement type %s", columnType));
}
}
+
+ public ObjectInspector createObjectInspector(
+ VectorExpression[] vectorExpressions, List<String> columnNames)
+ throws HiveException {
+ List<ObjectInspector> oids = new ArrayList<ObjectInspector>();
+ for (VectorExpression vexpr : vectorExpressions) {
+ ObjectInspector oi = createObjectInspector(vexpr);
+ oids.add(oi);
+ }
+ return ObjectInspectorFactory.getStandardStructObjectInspector(columnNames,
+ oids);
+ }
+
+ public void addToColumnMap(String columnName, int outputColumn) {
+ if (columnMap != null) {
+ columnMap.put(columnName, outputColumn);
+ }
+ }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java?rev=1487895&r1=1487894&r2=1487895&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java Thu May 30 15:20:01 2013
@@ -105,7 +105,8 @@ public class VectorizedColumnarSerDe ext
byteRow.resetValid(numCols);
- for (int k = 0; k < numCols; k++) {
+ for (int p = 0; p < batch.projectionSize; p++) {
+ int k = batch.projectedColumns[p];
ObjectInspector foi = fields.get(k).getFieldObjectInspector();
ColumnVector currentColVector = batch.cols[k];
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java?rev=1487895&r1=1487894&r2=1487895&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java Thu May 30 15:20:01 2013
@@ -35,6 +35,8 @@ public class VectorizedRowBatch implemen
public ColumnVector[] cols; // a vector for each column
public int size; // number of rows that qualify (i.e. haven't been filtered out)
public int[] selected; // array of positions of selected values
+ public int[] projectedColumns;
+ public int projectionSize;
/*
* If no filtering has been applied yet, selectedInUse is false,
@@ -80,6 +82,13 @@ public class VectorizedRowBatch implemen
selectedInUse = false;
this.cols = new ColumnVector[numCols];
writableRow = new Writable[numCols];
+ projectedColumns = new int[numCols];
+
+ // Initially all columns are projected and in the same order
+ projectionSize = numCols;
+ for (int i = 0; i < numCols; i++) {
+ projectedColumns[i] = i;
+ }
}
public void initRowIterator(){
@@ -92,12 +101,14 @@ public class VectorizedRowBatch implemen
}
if (selectedInUse) {
int i = selected[rowIteratorIndex];
- for (int c = 0; c < numCols; c++) {
+ for (int k = 0; k < projectionSize; k++) {
+ int c = this.projectedColumns[k];
writableRow[c] = cols[c].getWritableObject(i);
}
} else {
int i = rowIteratorIndex;
- for (int c = 0; c < numCols; c++) {
+ for (int k = 0; k < projectionSize; k++) {
+ int c = this.projectedColumns[k];
writableRow[c] = cols[c].getWritableObject(i);
}
}
@@ -123,7 +134,8 @@ public class VectorizedRowBatch implemen
for (int j = 0; j < size; j++) {
int i = selected[j];
int colIndex = 0;
- for (ColumnVector cv : cols) {
+ for (int k = 0; k < projectionSize; k++) {
+ ColumnVector cv = cols[this.projectedColumns[k]];
if (cv.isRepeating) {
b.append(cv.getWritableObject(0).toString());
} else {
@@ -141,7 +153,8 @@ public class VectorizedRowBatch implemen
} else {
for (int i = 0; i < size; i++) {
int colIndex = 0;
- for (ColumnVector cv : cols) {
+ for (int k = 0; k < projectionSize; k++) {
+ ColumnVector cv = cols[this.projectedColumns[k]];
if (cv.isRepeating) {
b.append(cv.getWritableObject(0).toString());
} else {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java?rev=1487895&r1=1487894&r2=1487895&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java Thu May 30 15:20:01 2013
@@ -55,7 +55,8 @@ public class VectorizedOrcSerde extends
} else {
index = i;
}
- for (int k = 0; k < batch.numCols; k++) {
+ for (int p = 0; p < batch.projectionSize; p++) {
+ int k = batch.projectedColumns[p];
Writable w;
if (batch.cols[k].isRepeating) {
w = batch.cols[k].getWritableObject(0);
Modified: hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSelectOperator.java?rev=1487895&r1=1487894&r2=1487895&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSelectOperator.java (original)
+++ hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSelectOperator.java Thu May 30 15:20:01 2013
@@ -25,44 +25,66 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.LongColAddLongColumn;
import org.apache.hadoop.hive.ql.exec.vector.util.VectorizedRowGroupGenUtil;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.junit.Test;
public class TestVectorSelectOperator {
+ static class ValidatorVectorSelectOperator extends VectorSelectOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ public ValidatorVectorSelectOperator(VectorizationContext ctxt, OperatorDesc conf) {
+ super(ctxt, conf);
+ }
+
+ /**
+ * Override forward to do validation
+ */
+ @Override
+ public void forward(Object row, ObjectInspector rowInspector) throws HiveException {
+ VectorizedRowBatch vrg = (VectorizedRowBatch) row;
+
+ int[] projections = vrg.projectedColumns;
+ assertEquals(2, vrg.projectionSize);
+ assertEquals(2, projections[0]);
+ assertEquals(3, projections[1]);
+
+ LongColumnVector out0 = (LongColumnVector) vrg.cols[projections[0]];
+ LongColumnVector out1 = (LongColumnVector) vrg.cols[projections[1]];
+
+ LongColumnVector in0 = (LongColumnVector) vrg.cols[0];
+ LongColumnVector in1 = (LongColumnVector) vrg.cols[1];
+ LongColumnVector in2 = (LongColumnVector) vrg.cols[2];
+ LongColumnVector in3 = (LongColumnVector) vrg.cols[3];
+
+ for (int i = 0; i < VectorizedRowBatch.DEFAULT_SIZE; i++) {
+ assertEquals(in0.vector[i] + in1.vector[i], out0.vector[i]);
+ assertEquals(in2.vector[i], out0.vector[i]);
+ assertEquals(in3.vector[i], out1.vector[i]);
+ }
+ }
+ }
+
@Test
public void testSelectOperator() throws HiveException {
- VectorSelectOperator vso = new VectorSelectOperator(null, new SelectDesc(false));
+
+ ValidatorVectorSelectOperator vso = new ValidatorVectorSelectOperator(null, new SelectDesc(
+ false));
VectorizedRowBatch vrg = VectorizedRowGroupGenUtil.getVectorizedRowBatch(
VectorizedRowBatch.DEFAULT_SIZE, 4, 17);
- LongColAddLongColumn lcalcExpr = new LongColAddLongColumn(0,1,2);
+ LongColAddLongColumn lcalcExpr = new LongColAddLongColumn(0, 1, 2);
IdentityExpression iexpr = new IdentityExpression(3, "long");
- VectorExpression [] ves = new VectorExpression [] { lcalcExpr, iexpr };
+ VectorExpression[] ves = new VectorExpression[] {lcalcExpr, iexpr};
vso.setSelectExpressions(ves);
vso.processOp(vrg, 0);
-
- VectorizedRowBatch output = vso.getOutput();
-
- assertEquals(2, output.numCols);
-
- LongColumnVector out0 = (LongColumnVector) output.cols[0];
- LongColumnVector out1 = (LongColumnVector) output.cols[1];
-
- LongColumnVector in0 = (LongColumnVector) vrg.cols[0];
- LongColumnVector in1 = (LongColumnVector) vrg.cols[1];
- LongColumnVector in2 = (LongColumnVector) vrg.cols[2];
- LongColumnVector in3 = (LongColumnVector) vrg.cols[3];
-
- for (int i = 0; i < VectorizedRowBatch.DEFAULT_SIZE; i ++) {
- assertEquals(in0.vector[i]+in1.vector[i], out0.vector[i]);
- assertEquals(in2.vector[i], out0.vector[i]);
- assertEquals(in3.vector[i], out1.vector[i]);
- }
}
}