You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2013/05/30 17:20:01 UTC

svn commit: r1487895 - in /hive/branches/vectorization/ql/src: java/org/apache/hadoop/hive/ql/exec/vector/ java/org/apache/hadoop/hive/ql/io/orc/ test/org/apache/hadoop/hive/ql/exec/vector/

Author: omalley
Date: Thu May 30 15:20:01 2013
New Revision: 1487895

URL: http://svn.apache.org/r1487895
Log:
HIVE-4603 VectorSelectOperator projections change the index of columns
for subsequent operators (Jitendra Nath Pandey via omalley)

Added:
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
Modified:
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java
    hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSelectOperator.java

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java?rev=1487895&r1=1487894&r2=1487895&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java Thu May 30 15:20:01 2013
@@ -49,6 +49,9 @@ public class BytesColumnVector extends C
   private byte[] buffer;   // optional buffer to use when actually copying in data
   private int nextFree;    // next free position in buffer
 
+  // Reusable text object
+  private final Text textObject = new Text();
+
   // Estimate that there will be 16 bytes per entry
   static final int DEFAULT_BUFFER_SIZE = 16 * VectorizedRowBatch.DEFAULT_SIZE;
 
@@ -208,8 +211,9 @@ public class BytesColumnVector extends C
     }
     Writable result = null;
     if (!isNull[index] && vector[index] != null) {
-      result = new Text();
-      ((Text) result).append(vector[index], start[index], length[index]);
+      textObject.clear();
+      textObject.append(vector[index], start[index], length[index]);
+      result = textObject;
     } else {
       result = NullWritable.get();
     }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java?rev=1487895&r1=1487894&r2=1487895&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java Thu May 30 15:20:01 2013
@@ -505,6 +505,9 @@ public class VectorMapOperator extends O
       case TABLESCAN:
         vectorOp = op.clone();
         break;
+      case REDUCESINK:
+        vectorOp = new VectorReduceSinkOperator(vectorizationContext, op.getConf());
+        break;
       default:
         throw new HiveException("Operator: " + op.getName() + ", " +
             "not vectorized");

Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1487895&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Thu May 30 15:20:01 2013
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.TerminalOperator;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+public class VectorReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
+  implements Serializable {
+
+  private static final Log LOG = LogFactory.getLog(
+      VectorReduceSinkOperator.class.getName());
+
+  private static final long serialVersionUID = 1L;
+
+  private final VectorizationContext vContext;
+
+  /**
+   * The evaluators for the key columns. Key columns decide the sort order on
+   * the reducer side. Key columns are passed to the reducer in the "key".
+   */
+  protected transient VectorExpression[] keyEval;
+  /**
+   * The evaluators for the value columns. Value columns are passed to reducer
+   * in the "value".
+   */
+  protected transient VectorExpression[] valueEval;
+
+  /**
+   * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
+   * Hive language). Partition columns decide the reducer that the current row
+   * goes to. Partition columns are not passed to reducer.
+   */
+  protected transient VectorExpression[] partitionEval;
+
+  private int numDistributionKeys;
+
+  private List<List<Integer>> distinctColIndices;
+
+  private int numDistinctExprs;
+
+  transient HiveKey keyWritable = new HiveKey();
+  transient Writable value;
+
+  transient Object[] cachedValues;
+  transient Object[][] cachedKeys;
+  transient Random random;
+
+  transient Serializer keySerializer;
+  transient boolean keyIsText;
+  transient Serializer valueSerializer;
+  transient int tag;
+  transient byte[] tagByte = new byte[1];
+
+  transient ObjectInspector keyObjectInspector;
+  transient ObjectInspector valueObjectInspector;
+  transient ObjectInspector[] partitionObjectInspectors;
+  transient int [] keyHashCode = new int [VectorizedRowBatch.DEFAULT_SIZE];
+
+
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    try {
+      vContext.setOperatorType(OperatorType.REDUCESINK);
+      keyEval = vContext.getVectorExpressions(conf.getKeyCols());
+      valueEval = vContext.getVectorExpressions(conf.getValueCols());
+      partitionEval = vContext.getVectorExpressions(conf.getPartitionCols());
+
+      numDistributionKeys = conf.getNumDistributionKeys();
+      distinctColIndices = conf.getDistinctColumnIndices();
+      numDistinctExprs = distinctColIndices.size();
+
+      TableDesc keyTableDesc = conf.getKeySerializeInfo();
+      keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
+          .newInstance();
+      keySerializer.initialize(null, keyTableDesc.getProperties());
+      keyIsText = keySerializer.getSerializedClass().equals(Text.class);
+
+      keyObjectInspector = vContext.createObjectInspector(keyEval,
+          conf.getOutputKeyColumnNames());
+
+      partitionObjectInspectors = new ObjectInspector[partitionEval.length];
+      for (int i = 0; i < partitionEval.length; i++) {
+        partitionObjectInspectors[i] = vContext.createObjectInspector(partitionEval[i]);
+      }
+
+      String colNames = "";
+      for(String colName : conf.getOutputKeyColumnNames()) {
+        colNames = String.format("%s %s", colNames, colName);
+      }
+
+      LOG.info(String.format("keyObjectInspector [%s]%s => %s",
+          keyObjectInspector.getClass(),
+          keyObjectInspector,
+          colNames));
+
+      conf.getOutputKeyColumnNames();
+      conf.getOutputValueColumnNames();
+
+      //keyObjectInspector = ObjectInspectorFactory.
+
+      TableDesc valueTableDesc = conf.getValueSerializeInfo();
+      valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+          .newInstance();
+      valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+      valueObjectInspector = vContext.createObjectInspector (valueEval,
+          conf.getOutputValueColumnNames());
+
+      colNames = "";
+      for(String colName : conf.getOutputValueColumnNames()) {
+        colNames = String.format("%s %s", colNames, colName);
+      }
+
+      LOG.info(String.format("valueObjectInspector [%s]%s => %s",
+          valueObjectInspector.getClass(),
+          valueObjectInspector,
+          colNames));
+
+      int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1;
+      int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 :
+        numDistributionKeys;
+      cachedKeys = new Object[numKeys][keyLen];
+      cachedValues = new Object[valueEval.length];
+
+    } catch(Exception e) {
+      throw new HiveException(e);
+    }
+  }
+
+  @Override
+  public void processOp(Object row, int tag) throws HiveException {
+    VectorizedRowBatch vrg = (VectorizedRowBatch) row;
+
+    LOG.info(String.format("sinking %d rows, %d values, %d keys, %d parts",
+        vrg.size,
+        valueEval.length,
+        keyEval.length,
+        partitionEval.length));
+
+    try {
+
+      for (int i = 0; i < partitionEval.length; i++) {
+        partitionEval[i].evaluate(vrg);
+      }
+
+      // run the vector evaluations
+      for (int i = 0; i < valueEval.length; i++) {
+         valueEval[i].evaluate(vrg);
+      }
+      // Evaluate the keys
+      for (int i = 0; i < keyEval.length; i++) {
+        keyEval[i].evaluate(vrg);
+      }
+
+      Object[] distributionKeys = new Object[numDistributionKeys];
+
+      // Emit a (k,v) pair for each row in the batch
+      //
+      for (int j = 0 ; j < vrg.size; ++j) {
+        int rowIndex = j;
+        if (vrg.selectedInUse) {
+          rowIndex = vrg.selected[j];
+        }
+        for (int i = 0; i < valueEval.length; i++) {
+          int batchColumn = valueEval[i].getOutputColumn();
+          ColumnVector vectorColumn = vrg.cols[batchColumn];
+          cachedValues[i] = vectorColumn.getWritableObject(rowIndex);
+        }
+        // Serialize the value
+        value = valueSerializer.serialize(cachedValues, valueObjectInspector);
+
+        for (int i = 0; i < keyEval.length; i++) {
+          int batchColumn = keyEval[i].getOutputColumn();
+          ColumnVector vectorColumn = vrg.cols[batchColumn];
+          distributionKeys[i] = vectorColumn.getWritableObject(rowIndex);
+        }
+        // no distinct key
+        System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys);
+        // Serialize the keys and append the tag
+        for (int i = 0; i < cachedKeys.length; i++) {
+          if (keyIsText) {
+            Text key = (Text) keySerializer.serialize(cachedKeys[i],
+                keyObjectInspector);
+            if (tag == -1) {
+              keyWritable.set(key.getBytes(), 0, key.getLength());
+            } else {
+              int keyLength = key.getLength();
+              keyWritable.setSize(keyLength + 1);
+              System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
+              keyWritable.get()[keyLength] = tagByte[0];
+            }
+          } else {
+            // Must be BytesWritable
+            BytesWritable key = (BytesWritable) keySerializer.serialize(
+                cachedKeys[i], keyObjectInspector);
+            if (tag == -1) {
+              keyWritable.set(key.getBytes(), 0, key.getLength());
+            } else {
+              int keyLength = key.getLength();
+              keyWritable.setSize(keyLength + 1);
+              System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
+              keyWritable.get()[keyLength] = tagByte[0];
+            }
+          }
+          // Evaluate the HashCode
+          int keyHashCode = 0;
+          if (partitionEval.length == 0) {
+            // If no partition cols, just distribute the data uniformly to provide
+            // better
+            // load balance. If the requirement is to have a single reducer, we
+            // should set
+            // the number of reducers to 1.
+            // Use a constant seed to make the code deterministic.
+            if (random == null) {
+              random = new Random(12345);
+            }
+            keyHashCode = random.nextInt();
+          } else {
+            for (int p = 0; p < partitionEval.length; p++) {
+              keyHashCode = keyHashCode
+                  * 31
+                  + ObjectInspectorUtils.hashCode(
+                      vrg.cols[partitionEval[p].getOutputColumn()].getWritableObject(rowIndex),
+                      partitionObjectInspectors[i]);
+            }
+          }
+          keyWritable.setHashCode(keyHashCode);
+          if (out != null) {
+            out.collect(keyWritable, value);
+            // Since this is a terminal operator, update counters explicitly -
+            // forward is not called
+            if (counterNameToEnum != null) {
+              ++outputRows;
+              if (outputRows % 1000 == 0) {
+                incrCounter(numOutputRowsCntr, outputRows);
+                outputRows = 0;
+              }
+            }
+          }
+        }
+      }
+    } catch (SerDeException e) {
+      throw new HiveException(e);
+    } catch (IOException e) {
+      throw new HiveException(e);
+    }
+  }
+
+  public VectorReduceSinkOperator (
+      VectorizationContext context,
+      OperatorDesc conf) {
+    this.vContext = context;
+    this.conf = (ReduceSinkDesc) conf;
+  }
+
+  /**
+   * @return the name of the operator
+   */
+  @Override
+  public String getName() {
+    return getOperatorName();
+  }
+
+  static public String getOperatorName() {
+    return "RS";
+  }
+
+  @Override
+  public OperatorType getType() {
+    return OperatorType.REDUCESINK;
+  }
+
+  @Override
+  public boolean opAllowedBeforeMapJoin() {
+    return false;
+  }
+
+}

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java?rev=1487895&r1=1487894&r2=1487895&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java Thu May 30 15:20:01 2013
@@ -40,9 +40,10 @@ public class VectorSelectOperator extend
 
   protected transient VectorExpression[] vExpressions;
 
-  VectorizedRowBatch output;
   private final VectorizationContext vContext;
 
+  private int [] projectedColumns = null;
+
   public VectorSelectOperator(VectorizationContext ctxt, OperatorDesc conf) {
     this.vContext = ctxt;
     this.conf = (SelectDesc) conf;
@@ -61,19 +62,19 @@ public class VectorSelectOperator extend
     vExpressions = new VectorExpression[colList.size()];
     for (int i = 0; i < colList.size(); i++) {
       vExpressions[i] = vContext.getVectorExpression(colList.get(i));
+      String columnName = conf.getOutputColumnNames().get(i);
+      // Update column map with output column names
+      vContext.addToColumnMap(columnName, vExpressions[i].getOutputColumn());
     }
-    output = new VectorizedRowBatch(colList.size(),
-        VectorizedRowBatch.DEFAULT_SIZE);
     initializeChildren(hconf);
+    projectedColumns = new int [vExpressions.length];
+    for (int i = 0; i < projectedColumns.length; i++) {
+      projectedColumns[i] = vExpressions[i].getOutputColumn();
+    }
   }
 
   public void setSelectExpressions(VectorExpression[] exprs) {
     this.vExpressions = exprs;
-    output = new VectorizedRowBatch(exprs.length, VectorizedRowBatch.DEFAULT_SIZE);
-  }
-
-  public VectorizedRowBatch getOutput() {
-    return output;
   }
 
   @Override
@@ -95,15 +96,18 @@ public class VectorSelectOperator extend
       }
     }
 
-    //Prepare output, shallow vector copy
-    output.selectedInUse = vrg.selectedInUse;
-    output.selected = vrg.selected;
-    output.size = vrg.size;
+    //Prepare output, set the projections
+    int[] originalProjections = vrg.projectedColumns;
+    int originalProjectionSize = vrg.projectionSize;
+    vrg.projectionSize = vExpressions.length;
     for (int i = 0; i < vExpressions.length; i++) {
-      output.cols[i] = vrg.cols[vExpressions[i].getOutputColumn()];
+      vrg.projectedColumns[i] = vExpressions[i].getOutputColumn();
     }
-    output.numCols = vExpressions.length;
-    forward(output, outputObjInspector);
+    forward(vrg, outputObjInspector);
+
+    // Revert the projected columns back, because vrg will be re-used.
+    vrg.projectionSize = originalProjectionSize;
+    vrg.projectedColumns = originalProjections;
   }
 
   /**

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1487895&r1=1487894&r2=1487895&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Thu May 30 15:20:01 2013
@@ -88,6 +88,7 @@ import org.apache.hadoop.hive.ql.udf.gen
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 
 /**
@@ -192,7 +193,6 @@ public class VectorizationContext {
 
   private VectorExpression getVectorExpression(ExprNodeColumnDesc
       exprDesc) {
-
     int columnNum = getInputColumnIndex(exprDesc.getColumn());
     VectorExpression expr = null;
     switch (opType) {
@@ -1074,7 +1074,8 @@ public class VectorizationContext {
     String columnType = vectorExpression.getOutputType();
     if (columnType.equalsIgnoreCase("long") ||
         columnType.equalsIgnoreCase("bigint") ||
-        columnType.equalsIgnoreCase("int")) {
+        columnType.equalsIgnoreCase("int") || 
+        columnType.equalsIgnoreCase("smallint")) {
       return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
     } else if (columnType.equalsIgnoreCase("double")) {
       return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
@@ -1084,5 +1085,23 @@ public class VectorizationContext {
       throw new HiveException(String.format("Must implement type %s", columnType));
     }
   }
+
+  public ObjectInspector createObjectInspector(
+      VectorExpression[] vectorExpressions, List<String> columnNames)
+      throws HiveException {
+    List<ObjectInspector> oids = new ArrayList<ObjectInspector>();
+    for (VectorExpression vexpr : vectorExpressions) {
+      ObjectInspector oi = createObjectInspector(vexpr);
+      oids.add(oi);
+    }
+    return ObjectInspectorFactory.getStandardStructObjectInspector(columnNames,
+        oids);
+  }
+
+  public void addToColumnMap(String columnName, int outputColumn) {
+    if (columnMap != null) {
+      columnMap.put(columnName, outputColumn);
+    }
+  }
 }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java?rev=1487895&r1=1487894&r2=1487895&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java Thu May 30 15:20:01 2013
@@ -105,7 +105,8 @@ public class VectorizedColumnarSerDe ext
 
         byteRow.resetValid(numCols);
 
-        for (int k = 0; k < numCols; k++) {
+        for (int p = 0; p < batch.projectionSize; p++) {
+          int k = batch.projectedColumns[p];
           ObjectInspector foi = fields.get(k).getFieldObjectInspector();
           ColumnVector currentColVector = batch.cols[k];
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java?rev=1487895&r1=1487894&r2=1487895&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java Thu May 30 15:20:01 2013
@@ -35,6 +35,8 @@ public class VectorizedRowBatch implemen
   public ColumnVector[] cols;   // a vector for each column
   public int size;              // number of rows that qualify (i.e. haven't been filtered out)
   public int[] selected;        // array of positions of selected values
+  public int[] projectedColumns;
+  public int projectionSize;
 
   /*
    * If no filtering has been applied yet, selectedInUse is false,
@@ -80,6 +82,13 @@ public class VectorizedRowBatch implemen
     selectedInUse = false;
     this.cols = new ColumnVector[numCols];
     writableRow = new Writable[numCols];
+    projectedColumns = new int[numCols];
+
+    // Initially all columns are projected and in the same order
+    projectionSize = numCols;
+    for (int i = 0; i < numCols; i++) {
+      projectedColumns[i] = i;
+    }
   }
 
   public void initRowIterator(){
@@ -92,12 +101,14 @@ public class VectorizedRowBatch implemen
     }
     if (selectedInUse) {
       int i = selected[rowIteratorIndex];
-      for (int c = 0; c < numCols; c++) {
+      for (int k = 0; k < projectionSize; k++) {
+        int c = this.projectedColumns[k];
         writableRow[c] = cols[c].getWritableObject(i);
       }
     } else {
       int i = rowIteratorIndex;
-      for (int c = 0; c < numCols; c++) {
+      for (int k = 0; k < projectionSize; k++) {
+        int c = this.projectedColumns[k];
         writableRow[c] = cols[c].getWritableObject(i);
       }
     }
@@ -123,7 +134,8 @@ public class VectorizedRowBatch implemen
       for (int j = 0; j < size; j++) {
         int i = selected[j];
         int colIndex = 0;
-        for (ColumnVector cv : cols) {
+        for (int k = 0; k < projectionSize; k++) {
+          ColumnVector cv = cols[this.projectedColumns[k]];
           if (cv.isRepeating) {
             b.append(cv.getWritableObject(0).toString());
           } else {
@@ -141,7 +153,8 @@ public class VectorizedRowBatch implemen
     } else {
       for (int i = 0; i < size; i++) {
         int colIndex = 0;
-        for (ColumnVector cv : cols) {
+        for (int k = 0; k < projectionSize; k++) {
+          ColumnVector cv = cols[this.projectedColumns[k]];
           if (cv.isRepeating) {
             b.append(cv.getWritableObject(0).toString());
           } else {

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java?rev=1487895&r1=1487894&r2=1487895&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java Thu May 30 15:20:01 2013
@@ -55,7 +55,8 @@ public class VectorizedOrcSerde extends 
       } else {
         index = i;
       }
-      for (int k = 0; k < batch.numCols; k++) {
+      for (int p = 0; p < batch.projectionSize; p++) {
+        int k = batch.projectedColumns[p];
         Writable w;
         if (batch.cols[k].isRepeating) {
           w = batch.cols[k].getWritableObject(0);

Modified: hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSelectOperator.java?rev=1487895&r1=1487894&r2=1487895&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSelectOperator.java (original)
+++ hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSelectOperator.java Thu May 30 15:20:01 2013
@@ -25,44 +25,66 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.LongColAddLongColumn;
 import org.apache.hadoop.hive.ql.exec.vector.util.VectorizedRowGroupGenUtil;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.junit.Test;
 
 public class TestVectorSelectOperator {
 
+  static class ValidatorVectorSelectOperator extends VectorSelectOperator {
+
+    private static final long serialVersionUID = 1L;
+
+    public ValidatorVectorSelectOperator(VectorizationContext ctxt, OperatorDesc conf) {
+      super(ctxt, conf);
+    }
+
+    /**
+     * Override forward to do validation
+     */
+    @Override
+    public void forward(Object row, ObjectInspector rowInspector) throws HiveException {
+      VectorizedRowBatch vrg = (VectorizedRowBatch) row;
+
+      int[] projections = vrg.projectedColumns;
+      assertEquals(2, vrg.projectionSize);
+      assertEquals(2, projections[0]);
+      assertEquals(3, projections[1]);
+
+      LongColumnVector out0 = (LongColumnVector) vrg.cols[projections[0]];
+      LongColumnVector out1 = (LongColumnVector) vrg.cols[projections[1]];
+
+      LongColumnVector in0 = (LongColumnVector) vrg.cols[0];
+      LongColumnVector in1 = (LongColumnVector) vrg.cols[1];
+      LongColumnVector in2 = (LongColumnVector) vrg.cols[2];
+      LongColumnVector in3 = (LongColumnVector) vrg.cols[3];
+
+      for (int i = 0; i < VectorizedRowBatch.DEFAULT_SIZE; i++) {
+        assertEquals(in0.vector[i] + in1.vector[i], out0.vector[i]);
+        assertEquals(in2.vector[i], out0.vector[i]);
+        assertEquals(in3.vector[i], out1.vector[i]);
+      }
+    }
+  }
+
   @Test
   public void testSelectOperator() throws HiveException {
-    VectorSelectOperator vso = new VectorSelectOperator(null, new SelectDesc(false));
+
+    ValidatorVectorSelectOperator vso = new ValidatorVectorSelectOperator(null, new SelectDesc(
+        false));
 
     VectorizedRowBatch vrg = VectorizedRowGroupGenUtil.getVectorizedRowBatch(
         VectorizedRowBatch.DEFAULT_SIZE, 4, 17);
 
-    LongColAddLongColumn lcalcExpr = new LongColAddLongColumn(0,1,2);
+    LongColAddLongColumn lcalcExpr = new LongColAddLongColumn(0, 1, 2);
     IdentityExpression iexpr = new IdentityExpression(3, "long");
 
-    VectorExpression [] ves = new VectorExpression [] { lcalcExpr, iexpr };
+    VectorExpression[] ves = new VectorExpression[] {lcalcExpr, iexpr};
 
     vso.setSelectExpressions(ves);
 
     vso.processOp(vrg, 0);
-
-    VectorizedRowBatch output = vso.getOutput();
-
-    assertEquals(2, output.numCols);
-
-    LongColumnVector out0 = (LongColumnVector) output.cols[0];
-    LongColumnVector out1 = (LongColumnVector) output.cols[1];
-
-    LongColumnVector in0 = (LongColumnVector) vrg.cols[0];
-    LongColumnVector in1 = (LongColumnVector) vrg.cols[1];
-    LongColumnVector in2 = (LongColumnVector) vrg.cols[2];
-    LongColumnVector in3 = (LongColumnVector) vrg.cols[3];
-
-    for (int i = 0; i < VectorizedRowBatch.DEFAULT_SIZE; i ++) {
-      assertEquals(in0.vector[i]+in1.vector[i], out0.vector[i]);
-      assertEquals(in2.vector[i], out0.vector[i]);
-      assertEquals(in3.vector[i], out1.vector[i]);
-    }
   }
 
 }