You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/03 03:38:02 UTC

[45/50] [abbrv] hive git commit: HIVE-12878: Support Vectorization for TEXTFILE and other formats (Matt McCline, reviewed by Sergey Shelukhin)

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
index 3eadc12..2e8331a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
@@ -24,22 +24,40 @@ import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
-import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.VectorPartitionConversion;
 import org.apache.hadoop.hive.serde2.fast.DeserializeRead;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.HiveIntervalDayTimeWritable;
+import org.apache.hadoop.hive.serde2.io.HiveIntervalYearMonthWritable;
+import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
-import org.apache.hive.common.util.DateUtils;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
 
 /**
  * This class deserializes a serialization format into a row of a VectorizedRowBatch.
- * 
+ *
  * The caller provides the hive type names and output column numbers in the order desired to
  * deserialize.
  *
@@ -54,667 +72,592 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
 
   private T deserializeRead;
 
-  private Reader<T>[] readersByValue;
-  private Reader<T>[] readersByReference;
-  private TypeInfo[] typeInfos;
+  private TypeInfo[] sourceTypeInfos;
 
   public VectorDeserializeRow(T deserializeRead) {
     this();
     this.deserializeRead = deserializeRead;
-    typeInfos = deserializeRead.typeInfos();
-    
+    sourceTypeInfos = deserializeRead.typeInfos();
   }
 
   // Not public since we must have the deserialize read object.
   private VectorDeserializeRow() {
   }
 
-  private abstract class Reader<R extends DeserializeRead> {
-    protected int columnIndex;
-
-    Reader(int columnIndex) {
-      this.columnIndex = columnIndex;
-    }
-
-    abstract void apply(VectorizedRowBatch batch, int batchIndex) throws IOException;
-  }
-
-  private abstract class AbstractLongReader extends Reader<T> {
-
-    AbstractLongReader(int columnIndex) {
-      super(columnIndex);
-    }
-  }
-
-  private class BooleanReader extends AbstractLongReader {
-
-    BooleanReader(int columnIndex) {
-      super(columnIndex);
-    }
-
-    @Override
-    void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
-      LongColumnVector colVector = (LongColumnVector) batch.cols[columnIndex];
-
-      if (deserializeRead.readCheckNull()) {
-        VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
-      } else {
-        boolean value = deserializeRead.readBoolean();
-        colVector.vector[batchIndex] = (value ? 1 : 0);
-        colVector.isNull[batchIndex] = false;
-      }
-    }
-  }
-
-  private class ByteReader extends AbstractLongReader {
-
-    ByteReader(int columnIndex) {
-      super(columnIndex);
-    }
-
-    @Override
-    void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
-      LongColumnVector colVector = (LongColumnVector) batch.cols[columnIndex];
-
-      if (deserializeRead.readCheckNull()) {
-        VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
-      } else {
-        byte value = deserializeRead.readByte();
-        colVector.vector[batchIndex] = (long) value;
-        colVector.isNull[batchIndex] = false;
-      }
-    }
-  }
-
-  private class ShortReader extends AbstractLongReader {
-
-    ShortReader(int columnIndex) {
-      super(columnIndex);
-    }
-
-    @Override
-    void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
-      LongColumnVector colVector = (LongColumnVector) batch.cols[columnIndex];
-
-      if (deserializeRead.readCheckNull()) {
-        VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
-      } else {
-        short value = deserializeRead.readShort();
-        colVector.vector[batchIndex] = (long) value;
-        colVector.isNull[batchIndex] = false;
-      }
-    }
-  }
-
-  private class IntReader extends AbstractLongReader {
-
-    IntReader(int columnIndex) {
-      super(columnIndex);
-    }
-
-    @Override
-    void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
-      LongColumnVector colVector = (LongColumnVector) batch.cols[columnIndex];
-
-      if (deserializeRead.readCheckNull()) {
-        VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
-      } else {
-        int value = deserializeRead.readInt();
-        colVector.vector[batchIndex] = (long) value;
-        colVector.isNull[batchIndex] = false;
-      }
-    }
-  }
-
-  private class LongReader extends AbstractLongReader {
-
-    LongReader(int columnIndex) {
-      super(columnIndex);
-    }
-
-    @Override
-    void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
-      LongColumnVector colVector = (LongColumnVector) batch.cols[columnIndex];
-
-      if (deserializeRead.readCheckNull()) {
-        VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
-      } else {
-        long value = deserializeRead.readLong();
-        colVector.vector[batchIndex] = value;
-        colVector.isNull[batchIndex] = false;
-      }
-    }
-  }
-
-  private class DateReader extends AbstractLongReader {
-
-    DeserializeRead.ReadDateResults readDateResults;
-
-    DateReader(int columnIndex) {
-      super(columnIndex);
-      readDateResults = deserializeRead.createReadDateResults();
-    }
-
-    @Override
-    void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
-      LongColumnVector colVector = (LongColumnVector) batch.cols[columnIndex];
-
-      if (deserializeRead.readCheckNull()) {
-        VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
-      } else {
-        deserializeRead.readDate(readDateResults);
-        colVector.vector[batchIndex] = (long) readDateResults.getDays();
-        colVector.isNull[batchIndex] = false;
-      }
-    }
-  }
-
-  private abstract class AbstractTimestampReader extends Reader<T> {
-
-    AbstractTimestampReader(int columnIndex) {
-      super(columnIndex);
-    }
-  }
-
-  private class TimestampReader extends AbstractTimestampReader {
-
-    DeserializeRead.ReadTimestampResults readTimestampResults;
-
-    TimestampReader(int columnIndex) {
-      super(columnIndex);
-      readTimestampResults = deserializeRead.createReadTimestampResults();
-    }
-
-    @Override
-    void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
-      TimestampColumnVector colVector = (TimestampColumnVector) batch.cols[columnIndex];
-
-      if (deserializeRead.readCheckNull()) {
-        VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
-      } else {
-        deserializeRead.readTimestamp(readTimestampResults);
-        colVector.set(batchIndex, readTimestampResults.getTimestamp());
-        colVector.isNull[batchIndex] = false;
-      }
-    }
-
-  }
-
-  private class IntervalYearMonthReader extends AbstractLongReader {
-
-    DeserializeRead.ReadIntervalYearMonthResults readIntervalYearMonthResults;
-
-    IntervalYearMonthReader(int columnIndex) {
-      super(columnIndex);
-      readIntervalYearMonthResults = deserializeRead.createReadIntervalYearMonthResults();
-    }
-
-    @Override
-    void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
-      LongColumnVector colVector = (LongColumnVector) batch.cols[columnIndex];
-
-      if (deserializeRead.readCheckNull()) {
-        VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
-      } else {
-        deserializeRead.readIntervalYearMonth(readIntervalYearMonthResults);
-        HiveIntervalYearMonth hiym = readIntervalYearMonthResults.getHiveIntervalYearMonth();
-        colVector.vector[batchIndex] = hiym.getTotalMonths();
-        colVector.isNull[batchIndex] = false;
+  /*
+   * These members have information for deserializing a row into the VectorizedRowBatch
+   * columns.
+   *
+   * We say "source" because when there is conversion we are converting th deserialized source into
+   * a target data type.
+   */
+  boolean[] isConvert;
+                // For each column, are we converting the row column?
+
+  int[] projectionColumnNums;
+                // Assigning can be a subset of columns, so this is the projection --
+                // the batch column numbers.
+
+  Category[] sourceCategories;
+                // The data type category of each column being deserialized.
+
+  PrimitiveCategory[] sourcePrimitiveCategories;
+                //The data type primitive category of each column being deserialized.
+
+  int[] maxLengths;
+                // For the CHAR and VARCHAR data types, the maximum character length of
+                // the columns.  Otherwise, 0.
+
+  /*
+   * These members have information for data type conversion.
+   * Not defined if there is no conversion.
+   */
+  Writable[] convertSourceWritables;
+                // Conversion requires source be placed in writable so we can call upon
+                // VectorAssignRow to convert and assign the row column.
+
+  VectorAssignRow convertVectorAssignRow;
+                // Use its conversion ability.
+
+  /*
+   * Allocate the source deserialization related arrays.
+   */
+  private void allocateArrays(int count) {
+    isConvert = new boolean[count];
+    projectionColumnNums = new int[count];
+    sourceCategories = new Category[count];
+    sourcePrimitiveCategories = new PrimitiveCategory[count];
+    maxLengths = new int[count];
+  }
+
+  /*
+   * Allocate the conversion related arrays (optional).
+   */
+  private void allocateConvertArrays(int count) {
+    convertSourceWritables = new Writable[count];
+  }
+
+  /*
+   * Initialize one column's source deserializtion related arrays.
+   */
+  private void initSourceEntry(int logicalColumnIndex, int projectionColumnNum, TypeInfo sourceTypeInfo) {
+    isConvert[logicalColumnIndex] = false;
+    projectionColumnNums[logicalColumnIndex] = projectionColumnNum;
+    Category sourceCategory = sourceTypeInfo.getCategory();
+    sourceCategories[logicalColumnIndex] = sourceCategory;
+    if (sourceCategory == Category.PRIMITIVE) {
+      PrimitiveTypeInfo sourcePrimitiveTypeInfo = (PrimitiveTypeInfo) sourceTypeInfo;
+      PrimitiveCategory sourcePrimitiveCategory = sourcePrimitiveTypeInfo.getPrimitiveCategory();
+      sourcePrimitiveCategories[logicalColumnIndex] = sourcePrimitiveCategory;
+      switch (sourcePrimitiveCategory) {
+      case CHAR:
+        maxLengths[logicalColumnIndex] = ((CharTypeInfo) sourcePrimitiveTypeInfo).getLength();
+        break;
+      case VARCHAR:
+        maxLengths[logicalColumnIndex] = ((VarcharTypeInfo) sourcePrimitiveTypeInfo).getLength();
+        break;
+      default:
+        // No additional data type specific setting.
+        break;
       }
+    } else {
+      // We don't currently support complex types.
+      Preconditions.checkState(false);
     }
   }
 
-  private abstract class AbstractIntervalDayTimeReader extends Reader<T> {
+  /*
+   * Initialize the conversion related arrays.  Assumes initSourceEntry has already been called.
+   */
+  private void initConvertTargetEntry(int logicalColumnIndex) {
+    isConvert[logicalColumnIndex] = true;
 
-    AbstractIntervalDayTimeReader(int columnIndex) {
-      super(columnIndex);
+    if (sourceCategories[logicalColumnIndex] == Category.PRIMITIVE) {
+      convertSourceWritables[logicalColumnIndex] =
+          VectorizedBatchUtil.getPrimitiveWritable(sourcePrimitiveCategories[logicalColumnIndex]);
+    } else {
+      // We don't currently support complex types.
+      Preconditions.checkState(false);
     }
   }
 
-  private class IntervalDayTimeReader extends AbstractIntervalDayTimeReader {
-
-    DeserializeRead.ReadIntervalDayTimeResults readIntervalDayTimeResults;
-
-    IntervalDayTimeReader(int columnIndex) {
-      super(columnIndex);
-      readIntervalDayTimeResults = deserializeRead.createReadIntervalDayTimeResults();
-    }
-
-    @Override
-    void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
-      IntervalDayTimeColumnVector colVector = (IntervalDayTimeColumnVector) batch.cols[columnIndex];
-
-      if (deserializeRead.readCheckNull()) {
-        VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
-      } else {
-        deserializeRead.readIntervalDayTime(readIntervalDayTimeResults);
-        HiveIntervalDayTime idt = readIntervalDayTimeResults.getHiveIntervalDayTime();
-        colVector.set(batchIndex, idt);
-        colVector.isNull[batchIndex] = false;
-      }
-    }
-  }
+  /*
+   * Specify the columns to deserialize into as an array.
+   */
+  public void init(int[] outputColumns) throws HiveException {
 
-  private abstract class AbstractDoubleReader extends Reader<T> {
+    final int count = sourceTypeInfos.length;
+    allocateArrays(count);
 
-    AbstractDoubleReader(int columnIndex) {
-      super(columnIndex);
+    for (int i = 0; i < count; i++) {
+      int outputColumn = outputColumns[i];
+      initSourceEntry(i, outputColumn, sourceTypeInfos[i]);
     }
   }
 
-  private class FloatReader extends AbstractDoubleReader {
-
-    FloatReader(int columnIndex) {
-      super(columnIndex);
-    }
+  /*
+   * Specify the columns to deserialize into as a list.
+   */
+  public void init(List<Integer> outputColumns) throws HiveException {
 
-    @Override
-    void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
-      DoubleColumnVector colVector = (DoubleColumnVector) batch.cols[columnIndex];
+    final int count = sourceTypeInfos.length;
+    allocateArrays(count);
 
-      if (deserializeRead.readCheckNull()) {
-        VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
-      } else {
-        float value = deserializeRead.readFloat();
-        colVector.vector[batchIndex] = (double) value;
-        colVector.isNull[batchIndex] = false;
-      }
+    for (int i = 0; i < count; i++) {
+      int outputColumn = outputColumns.get(i);
+      initSourceEntry(i, outputColumn, sourceTypeInfos[i]);
     }
   }
 
-  private class DoubleReader extends AbstractDoubleReader {
-
-    DoubleReader(int columnIndex) {
-      super(columnIndex);
-    }
-
-    @Override
-    void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
-      DoubleColumnVector colVector = (DoubleColumnVector) batch.cols[columnIndex];
-
-      if (deserializeRead.readCheckNull()) {
-        VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
-      } else {
-        double value = deserializeRead.readDouble();
-        colVector.vector[batchIndex] = value;
-        colVector.isNull[batchIndex] = false;
-      }
-    }
-  }
+  /*
+   * Specify the columns to deserialize into a range starting at a column number.
+   */
+  public void init(int startColumn) throws HiveException {
 
-  private abstract class AbstractBytesReader extends Reader<T> {
+    final int count = sourceTypeInfos.length;
+    allocateArrays(count);
 
-    AbstractBytesReader(int columnIndex) {
-      super(columnIndex);
+    for (int i = 0; i < count; i++) {
+      int outputColumn = startColumn + i;
+      initSourceEntry(i, outputColumn, sourceTypeInfos[i]);
     }
   }
 
-  private class StringReaderByValue extends AbstractBytesReader {
-
-    private DeserializeRead.ReadStringResults readStringResults;
-
-    StringReaderByValue(int columnIndex) {
-      super(columnIndex);
-      readStringResults = deserializeRead.createReadStringResults();
-    }
-
-    @Override
-    void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
-      BytesColumnVector colVector = (BytesColumnVector) batch.cols[columnIndex];
+  public void init(boolean[] columnsToIncludeTruncated) throws HiveException {
 
-      if (deserializeRead.readCheckNull()) {
-        VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
-      } else {
-        deserializeRead.readString(readStringResults);
-        colVector.setVal(batchIndex, readStringResults.bytes,
-                readStringResults.start, readStringResults.length);
-        colVector.isNull[batchIndex] = false;
-      }
+    if (columnsToIncludeTruncated != null) {
+      deserializeRead.setColumnsToInclude(columnsToIncludeTruncated);
     }
-  }
 
-  private class StringReaderByReference extends AbstractBytesReader {
+    final int columnCount = (columnsToIncludeTruncated == null ?
+        sourceTypeInfos.length : columnsToIncludeTruncated.length);
+    allocateArrays(columnCount);
 
-    private DeserializeRead.ReadStringResults readStringResults;
+    for (int i = 0; i < columnCount; i++) {
 
-    StringReaderByReference(int columnIndex) {
-      super(columnIndex);
-      readStringResults = deserializeRead.createReadStringResults();
-    }
+      if (columnsToIncludeTruncated != null && !columnsToIncludeTruncated[i]) {
 
-    @Override
-    void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
-      BytesColumnVector colVector = (BytesColumnVector) batch.cols[columnIndex];
+        // Field not included in query.
 
-      if (deserializeRead.readCheckNull()) {
-        VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
       } else {
-        deserializeRead.readString(readStringResults);
-        colVector.setRef(batchIndex, readStringResults.bytes,
-                readStringResults.start, readStringResults.length);
-        colVector.isNull[batchIndex] = false;
-      }
-    }
-  }
-
-  private class CharReaderByValue extends AbstractBytesReader {
 
-    private DeserializeRead.ReadStringResults readStringResults;
+        initSourceEntry(i, i, sourceTypeInfos[i]);
 
-    private CharTypeInfo charTypeInfo;
-
-    CharReaderByValue(CharTypeInfo charTypeInfo, int columnIndex) {
-      super(columnIndex);
-      readStringResults = deserializeRead.createReadStringResults();
-      this.charTypeInfo = charTypeInfo;
-    }
-
-    @Override
-    void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
-      BytesColumnVector colVector = (BytesColumnVector) batch.cols[columnIndex];
-
-      if (deserializeRead.readCheckNull()) {
-        VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
-      } else {
-        // Use the basic STRING bytes read to get access, then use our optimal truncate/trim method
-        // that does not use Java String objects.
-        deserializeRead.readString(readStringResults);
-        int adjustedLength = StringExpr.rightTrimAndTruncate(readStringResults.bytes,
-                readStringResults.start, readStringResults.length, charTypeInfo.getLength());
-        colVector.setVal(batchIndex, readStringResults.bytes, readStringResults.start, adjustedLength);
-        colVector.isNull[batchIndex] = false;
       }
     }
   }
 
-  private class CharReaderByReference extends AbstractBytesReader {
-
-    private DeserializeRead.ReadStringResults readStringResults;
-
-    private CharTypeInfo charTypeInfo;
-
-    CharReaderByReference(CharTypeInfo charTypeInfo, int columnIndex) {
-      super(columnIndex);
-      readStringResults = deserializeRead.createReadStringResults();
-      this.charTypeInfo = charTypeInfo;
-    }
-
-    @Override
-    void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
-      BytesColumnVector colVector = (BytesColumnVector) batch.cols[columnIndex];
-
-      if (deserializeRead.readCheckNull()) {
-        VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
-      } else {
-        // Use the basic STRING bytes read to get access, then use our optimal truncate/trim method
-        // that does not use Java String objects.
-        deserializeRead.readString(readStringResults);
-        int adjustedLength = StringExpr.rightTrimAndTruncate(readStringResults.bytes,
-                readStringResults.start, readStringResults.length, charTypeInfo.getLength());
-        colVector.setRef(batchIndex, readStringResults.bytes, readStringResults.start, adjustedLength);
-        colVector.isNull[batchIndex] = false;
-      }
+  /**
+   * Initialize for converting the source data type that are going to be read with the
+   * DeserializedRead interface passed to the constructor to the target data types desired in
+   * the VectorizedRowBatch.
+   *
+   * No projection -- the column range 0 .. count-1
+   *
+   *    where count is the minimum of the target data type array size, included array size,
+   *       and source data type array size.
+   *
+   * @param targetTypeInfos
+   * @param columnsToIncludeTruncated
+   * @return the minimum count described above is returned.  That is, the number of columns
+   *         that will be processed by deserialize.
+   * @throws HiveException
+   */
+  public int initConversion(TypeInfo[] targetTypeInfos,
+      boolean[] columnsToIncludeTruncated) throws HiveException {
+
+    if (columnsToIncludeTruncated != null) {
+      deserializeRead.setColumnsToInclude(columnsToIncludeTruncated);
+    }
+
+    int targetColumnCount;
+    if (columnsToIncludeTruncated == null) {
+      targetColumnCount = targetTypeInfos.length;
+    } else {
+      targetColumnCount = Math.min(targetTypeInfos.length, columnsToIncludeTruncated.length);
     }
-  }
 
-  private class VarcharReaderByValue extends AbstractBytesReader {
+    int sourceColumnCount = Math.min(sourceTypeInfos.length, targetColumnCount);
+    allocateArrays(sourceColumnCount);
+    allocateConvertArrays(sourceColumnCount);
 
-    private DeserializeRead.ReadStringResults readStringResults;
+    boolean atLeastOneConvert = false;
+    for (int i = 0; i < sourceColumnCount; i++) {
 
-    private VarcharTypeInfo varcharTypeInfo;
+      if (columnsToIncludeTruncated != null && !columnsToIncludeTruncated[i]) {
 
-    VarcharReaderByValue(VarcharTypeInfo varcharTypeInfo, int columnIndex) {
-      super(columnIndex);
-      readStringResults = deserializeRead.createReadStringResults();
-      this.varcharTypeInfo = varcharTypeInfo;
-    }
+        // Field not included in query.
 
-    @Override
-    void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
-      BytesColumnVector colVector = (BytesColumnVector) batch.cols[columnIndex];
-
-      if (deserializeRead.readCheckNull()) {
-        VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
       } else {
-        // Use the basic STRING bytes read to get access, then use our optimal truncate/trim method
-        // that does not use Java String objects.
-        deserializeRead.readString(readStringResults);
-        int adjustedLength = StringExpr.truncate(readStringResults.bytes,
-                readStringResults.start, readStringResults.length, varcharTypeInfo.getLength());
-        colVector.setVal(batchIndex, readStringResults.bytes, readStringResults.start, adjustedLength);
-        colVector.isNull[batchIndex] = false;
-      }
-    }
-  }
-
-  private class VarcharReaderByReference extends AbstractBytesReader {
 
-    private DeserializeRead.ReadStringResults readStringResults;
+        TypeInfo sourceTypeInfo = sourceTypeInfos[i];
+        TypeInfo targetTypeInfo = targetTypeInfos[i];
 
-    private VarcharTypeInfo varcharTypeInfo;
-
-    VarcharReaderByReference(VarcharTypeInfo varcharTypeInfo, int columnIndex) {
-      super(columnIndex);
-      readStringResults = deserializeRead.createReadStringResults();
-      this.varcharTypeInfo = varcharTypeInfo;
-    }
+        if (!sourceTypeInfo.equals(targetTypeInfo)) {
 
-    @Override
-    void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
-      BytesColumnVector colVector = (BytesColumnVector) batch.cols[columnIndex];
+          if (VectorPartitionConversion.isImplicitVectorColumnConversion(sourceTypeInfo, targetTypeInfo)) {
 
-      if (deserializeRead.readCheckNull()) {
-        VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
-      } else {
-        // Use the basic STRING bytes read to get access, then use our optimal truncate/trim method
-        // that does not use Java String objects.
-        deserializeRead.readString(readStringResults);
-        int adjustedLength = StringExpr.truncate(readStringResults.bytes,
-                readStringResults.start, readStringResults.length, varcharTypeInfo.getLength());
-        colVector.setRef(batchIndex, readStringResults.bytes, readStringResults.start, adjustedLength);
-        colVector.isNull[batchIndex] = false;
-      }
-    }
-  }
+            // Do implicit conversion from source type to target type.
+            initSourceEntry(i, i, sourceTypeInfo);
 
-  private class BinaryReaderByValue extends AbstractBytesReader {
+          } else {
 
-    private DeserializeRead.ReadBinaryResults readBinaryResults;
+            // Do formal conversion...
+            initSourceEntry(i, i, sourceTypeInfo);
+            initConvertTargetEntry(i);
+            atLeastOneConvert = true;
 
-    BinaryReaderByValue(int columnIndex) {
-      super(columnIndex);
-      readBinaryResults = deserializeRead.createReadBinaryResults();
-    }
+          }
+        } else {
 
-    @Override
-    void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
-      BytesColumnVector colVector = (BytesColumnVector) batch.cols[columnIndex];
+          // No conversion.
+          initSourceEntry(i, i, sourceTypeInfo);
 
-      if (deserializeRead.readCheckNull()) {
-        VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
-      } else {
-        deserializeRead.readBinary(readBinaryResults);
-        colVector.setVal(batchIndex, readBinaryResults.bytes,
-                readBinaryResults.start, readBinaryResults.length);
-        colVector.isNull[batchIndex] = false;
+        }
       }
     }
-  }
 
-  private class BinaryReaderByReference extends AbstractBytesReader {
+    if (atLeastOneConvert) {
 
-    private DeserializeRead.ReadBinaryResults readBinaryResults;
-
-    BinaryReaderByReference(int columnIndex) {
-      super(columnIndex);
-      readBinaryResults = deserializeRead.createReadBinaryResults();
+      // Let the VectorAssignRow class do the conversion.
+      convertVectorAssignRow = new VectorAssignRow();
+      convertVectorAssignRow.initConversion(sourceTypeInfos, targetTypeInfos,
+          columnsToIncludeTruncated);
     }
 
-    @Override
-    void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
-      BytesColumnVector colVector = (BytesColumnVector) batch.cols[columnIndex];
-
-      if (deserializeRead.readCheckNull()) {
-        VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
-      } else {
-        deserializeRead.readBinary(readBinaryResults);
-        colVector.setRef(batchIndex, readBinaryResults.bytes,
-                readBinaryResults.start, readBinaryResults.length);
-        colVector.isNull[batchIndex] = false;
-      }
-    }
+    return sourceColumnCount;
   }
 
-  private class HiveDecimalReader extends Reader<T> {
-
-    private DeserializeRead.ReadDecimalResults readDecimalResults;
-
-    HiveDecimalReader(int columnIndex) {
-      super(columnIndex);
-      readDecimalResults = deserializeRead.createReadDecimalResults();
-    }
-
-    @Override
-    void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
-      DecimalColumnVector colVector = (DecimalColumnVector) batch.cols[columnIndex];
-
-      if (deserializeRead.readCheckNull()) {
-        VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
-      } else {
-        deserializeRead.readHiveDecimal(readDecimalResults);
-        HiveDecimal hiveDecimal = readDecimalResults.getHiveDecimal();
-        colVector.vector[batchIndex].set(hiveDecimal);
-        colVector.isNull[batchIndex] = false;
-      }
-    }
+  public void init() throws HiveException {
+    init(0);
   }
 
-  private void addReader(int index, int outputColumn) throws HiveException {
-    Reader<T> readerByValue = null;
-    Reader<T> readerByReference = null;
-
-    PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfos[index];
-    PrimitiveCategory primitiveCategory = primitiveTypeInfo.getPrimitiveCategory();
-    switch (primitiveCategory) {
-    // case VOID:
-    //   UNDONE:
-    // break;
-    case BOOLEAN:
-      readerByValue = new BooleanReader(outputColumn);
-      break;
-    case BYTE:
-      readerByValue = new ByteReader(outputColumn);
-      break;
-    case SHORT:
-      readerByValue = new ShortReader(outputColumn);
-      break;
-    case INT:
-      readerByValue = new IntReader(outputColumn);
-      break;
-    case LONG:
-      readerByValue = new LongReader(outputColumn);
-      break;
-    case DATE:
-      readerByValue = new DateReader(outputColumn);
-      break;
-    case TIMESTAMP:
-      readerByValue = new TimestampReader(outputColumn);
-      break;
-    case FLOAT:
-      readerByValue = new FloatReader(outputColumn);
-      break;
-    case DOUBLE:
-      readerByValue = new DoubleReader(outputColumn);
-      break;
-    case STRING:
-      readerByValue = new StringReaderByValue(outputColumn);
-      readerByReference = new StringReaderByReference(outputColumn);
-      break;
-    case CHAR:
+  /**
+   * Deserialize one row column value.
+   *
+   * @param batch
+   * @param batchIndex
+   * @param logicalColumnIndex
+   * @throws IOException
+   */
+  private void deserializeRowColumn(VectorizedRowBatch batch, int batchIndex,
+      int logicalColumnIndex) throws IOException {
+    final int projectionColumnNum = projectionColumnNums[logicalColumnIndex];
+    if (deserializeRead.readCheckNull()) {
+      VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex);
+      return;
+    }
+
+    // We have a value for the row column.
+    Category sourceCategory = sourceCategories[logicalColumnIndex];
+    if (sourceCategory == null) {
+      /*
+       * This is a column that we don't want (i.e. not included).
+       * The deserializeRead.readCheckNull() has read the field, so we are done.
+       */
+      return;
+    }
+    switch (sourceCategory) {
+    case PRIMITIVE:
       {
-        CharTypeInfo charTypeInfo = (CharTypeInfo) primitiveTypeInfo;
-        readerByValue = new CharReaderByValue(charTypeInfo, outputColumn);
-        readerByReference = new CharReaderByReference(charTypeInfo, outputColumn);
+        PrimitiveCategory sourcePrimitiveCategory = sourcePrimitiveCategories[logicalColumnIndex];
+        switch (sourcePrimitiveCategory) {
+        case VOID:
+          VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex);
+          return;
+        case BOOLEAN:
+          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
+              (deserializeRead.currentBoolean ? 1 : 0);
+          break;
+        case BYTE:
+          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
+              deserializeRead.currentByte;
+          break;
+        case SHORT:
+          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
+              deserializeRead.currentShort;
+          break;
+        case INT:
+          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
+              deserializeRead.currentInt;
+          break;
+        case LONG:
+          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
+              deserializeRead.currentLong;
+          break;
+        case TIMESTAMP:
+          ((TimestampColumnVector) batch.cols[projectionColumnNum]).set(
+              batchIndex, deserializeRead.currentTimestampWritable.getTimestamp());
+          break;
+        case DATE:
+          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
+              deserializeRead.currentDateWritable.getDays();
+          break;
+        case FLOAT:
+          ((DoubleColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
+              deserializeRead.currentFloat;
+          break;
+        case DOUBLE:
+          ((DoubleColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
+              deserializeRead.currentDouble;
+          break;
+        case BINARY:
+        case STRING:
+          ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal(
+              batchIndex,
+              deserializeRead.currentBytes,
+              deserializeRead.currentBytesStart,
+              deserializeRead.currentBytesLength);
+          break;
+        case VARCHAR:
+          {
+            // Use the basic STRING bytes read to get access, then use our optimal truncate/trim method
+            // that does not use Java String objects.
+            int adjustedLength = StringExpr.truncate(
+                deserializeRead.currentBytes,
+                deserializeRead.currentBytesStart,
+                deserializeRead.currentBytesLength,
+                maxLengths[logicalColumnIndex]);
+            ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal(
+                batchIndex,
+                deserializeRead.currentBytes,
+                deserializeRead.currentBytesStart,
+                adjustedLength);
+          }
+          break;
+        case CHAR:
+          {
+            // Use the basic STRING bytes read to get access, then use our optimal truncate/trim method
+            // that does not use Java String objects.
+            int adjustedLength = StringExpr.rightTrimAndTruncate(
+                deserializeRead.currentBytes,
+                deserializeRead.currentBytesStart,
+                deserializeRead.currentBytesLength,
+                maxLengths[logicalColumnIndex]);
+            ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal(
+                batchIndex,
+                deserializeRead.currentBytes,
+                deserializeRead.currentBytesStart,
+                adjustedLength);
+          }
+          break;
+        case DECIMAL:
+          ((DecimalColumnVector) batch.cols[projectionColumnNum]).set(
+              batchIndex, deserializeRead.currentHiveDecimalWritable.getHiveDecimal());
+          break;
+        case INTERVAL_YEAR_MONTH:
+          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
+              deserializeRead.currentHiveIntervalYearMonthWritable.getHiveIntervalYearMonth().getTotalMonths();
+          break;
+        case INTERVAL_DAY_TIME:
+          ((IntervalDayTimeColumnVector) batch.cols[projectionColumnNum]).set(
+              batchIndex, deserializeRead.currentHiveIntervalDayTimeWritable.getHiveIntervalDayTime());
+          break;
+        default:
+          throw new RuntimeException("Primitive category " + sourcePrimitiveCategory.name() +
+              " not supported");
+        }
       }
       break;
-    case VARCHAR:
+    default:
+      throw new RuntimeException("Category " + sourceCategory.name() + " not supported");
+    }
+
+    // We always set the null flag to false when there is a value.
+    batch.cols[projectionColumnNum].isNull[batchIndex] = false;
+  }
+
+  /**
+   * Deserialize and convert one row column value.
+   *
+   * We deserialize into a writable and then pass that writable to an instance of VectorAssignRow
+   * to convert the writable to the target data type and assign it into the VectorizedRowBatch.
+   *
+   * @param batch
+   * @param batchIndex
+   * @param logicalColumnIndex
+   * @throws IOException
+   */
+  private void deserializeConvertRowColumn(VectorizedRowBatch batch, int batchIndex,
+      int logicalColumnIndex) throws IOException {
+    final int projectionColumnNum = projectionColumnNums[logicalColumnIndex];
+    if (deserializeRead.readCheckNull()) {
+      VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex);
+      return;
+    }
+
+    // We have a value for the row column.
+    Category sourceCategory = sourceCategories[logicalColumnIndex];
+    if (sourceCategory == null) {
+      /*
+       * This is a column that we don't want (i.e. not included).
+       * The deserializeRead.readCheckNull() has read the field, so we are done.
+       */
+      return;
+    }
+    Writable convertSourceWritable = convertSourceWritables[logicalColumnIndex];
+    switch (sourceCategory) {
+    case PRIMITIVE:
       {
-        VarcharTypeInfo varcharTypeInfo = (VarcharTypeInfo) primitiveTypeInfo;
-        readerByValue = new VarcharReaderByValue(varcharTypeInfo, outputColumn);
-        readerByReference = new VarcharReaderByReference(varcharTypeInfo, outputColumn);
+        PrimitiveCategory sourcePrimitiveCategory = sourcePrimitiveCategories[logicalColumnIndex];
+        switch (sourcePrimitiveCategory) {
+        case VOID:
+          convertSourceWritable = null;
+          break;
+        case BOOLEAN:
+          ((BooleanWritable) convertSourceWritable).set(deserializeRead.currentBoolean);
+          break;
+        case BYTE:
+          ((ByteWritable) convertSourceWritable).set(deserializeRead.currentByte);
+          break;
+        case SHORT:
+          ((ShortWritable) convertSourceWritable).set(deserializeRead.currentShort);
+          break;
+        case INT:
+          ((IntWritable) convertSourceWritable).set(deserializeRead.currentInt);
+          break;
+        case LONG:
+          ((LongWritable) convertSourceWritable).set(deserializeRead.currentLong);
+          break;
+        case TIMESTAMP:
+          ((TimestampWritable) convertSourceWritable).set(deserializeRead.currentTimestampWritable);
+          break;
+        case DATE:
+          ((DateWritable) convertSourceWritable).set(deserializeRead.currentDateWritable);
+          break;
+        case FLOAT:
+          ((FloatWritable) convertSourceWritable).set(deserializeRead.currentFloat);
+          break;
+        case DOUBLE:
+          ((DoubleWritable) convertSourceWritable).set(deserializeRead.currentDouble);
+          break;
+        case BINARY:
+          if (deserializeRead.currentBytes == null) {
+            LOG.info("null binary entry: batchIndex " + batchIndex + " projection column num " + projectionColumnNum);
+          }
+
+          ((BytesWritable) convertSourceWritable).set(
+              deserializeRead.currentBytes,
+              deserializeRead.currentBytesStart,
+              deserializeRead.currentBytesLength);
+          break;
+        case STRING:
+          if (deserializeRead.currentBytes == null) {
+            throw new RuntimeException(
+                "null string entry: batchIndex " + batchIndex + " projection column num " + projectionColumnNum);
+          }
+
+          // Use org.apache.hadoop.io.Text as our helper to go from byte[] to String.
+          ((Text) convertSourceWritable).set(
+              deserializeRead.currentBytes,
+              deserializeRead.currentBytesStart,
+              deserializeRead.currentBytesLength);
+          break;
+        case VARCHAR:
+          {
+            // Use the basic STRING bytes read to get access, then use our optimal truncate/trim method
+            // that does not use Java String objects.
+            if (deserializeRead.currentBytes == null) {
+              throw new RuntimeException(
+                  "null varchar entry: batchIndex " + batchIndex + " projection column num " + projectionColumnNum);
+            }
+
+            int adjustedLength = StringExpr.truncate(
+                deserializeRead.currentBytes,
+                deserializeRead.currentBytesStart,
+                deserializeRead.currentBytesLength,
+                maxLengths[logicalColumnIndex]);
+
+            ((HiveVarcharWritable) convertSourceWritable).set(
+                new String(
+                  deserializeRead.currentBytes,
+                  deserializeRead.currentBytesStart,
+                  adjustedLength,
+                  Charsets.UTF_8),
+                -1);
+          }
+          break;
+        case CHAR:
+          {
+            // Use the basic STRING bytes read to get access, then use our optimal truncate/trim method
+            // that does not use Java String objects.
+            if (deserializeRead.currentBytes == null) {
+              throw new RuntimeException(
+                  "null char entry: batchIndex " + batchIndex + " projection column num " + projectionColumnNum);
+            }
+
+            int adjustedLength = StringExpr.rightTrimAndTruncate(
+                deserializeRead.currentBytes,
+                deserializeRead.currentBytesStart,
+                deserializeRead.currentBytesLength,
+                maxLengths[logicalColumnIndex]);
+
+            ((HiveCharWritable) convertSourceWritable).set(
+                new String(
+                  deserializeRead.currentBytes,
+                  deserializeRead.currentBytesStart,
+                  adjustedLength, Charsets.UTF_8),
+                -1);
+          }
+          break;
+        case DECIMAL:
+          ((HiveDecimalWritable) convertSourceWritable).set(
+              deserializeRead.currentHiveDecimalWritable);
+          break;
+        case INTERVAL_YEAR_MONTH:
+          ((HiveIntervalYearMonthWritable) convertSourceWritable).set(
+              deserializeRead.currentHiveIntervalYearMonthWritable);
+          break;
+        case INTERVAL_DAY_TIME:
+          ((HiveIntervalDayTimeWritable) convertSourceWritable).set(
+              deserializeRead.currentHiveIntervalDayTimeWritable);
+          break;
+        default:
+          throw new RuntimeException("Primitive category " + sourcePrimitiveCategory.name() +
+              " not supported");
+        }
       }
       break;
-    case BINARY:
-      readerByValue = new BinaryReaderByValue(outputColumn);
-      readerByReference = new BinaryReaderByReference(outputColumn);
-      break;
-    case DECIMAL:
-      readerByValue = new HiveDecimalReader(outputColumn);
-      break;
-    case INTERVAL_YEAR_MONTH:
-      readerByValue = new IntervalYearMonthReader(outputColumn);
-      break;
-    case INTERVAL_DAY_TIME:
-      readerByValue = new IntervalDayTimeReader(outputColumn);
-      break;
     default:
-      throw new HiveException("Unexpected primitive type category " + primitiveCategory);
-    }
-
-    readersByValue[index] = readerByValue;
-    if (readerByReference == null) {
-      readersByReference[index] = readerByValue;
-    } else {
-      readersByReference[index] = readerByReference;
-    }
-  }
-
-  public void init(int[] outputColumns) throws HiveException {
-
-    readersByValue = new Reader[typeInfos.length];
-    readersByReference = new Reader[typeInfos.length];
-
-    for (int i = 0; i < typeInfos.length; i++) {
-      int outputColumn = outputColumns[i];
-      addReader(i, outputColumn);
-    }
-  }
-
-  public void init(List<Integer> outputColumns) throws HiveException {
-
-    readersByValue = new Reader[typeInfos.length];
-    readersByReference = new Reader[typeInfos.length];
-
-    for (int i = 0; i < typeInfos.length; i++) {
-      int outputColumn = outputColumns.get(i);
-      addReader(i, outputColumn);
-    }
-  }
-
-  public void init(int startColumn) throws HiveException {
-
-    readersByValue = new Reader[typeInfos.length];
-    readersByReference = new Reader[typeInfos.length];
-
-    for (int i = 0; i < typeInfos.length; i++) {
-      int outputColumn = startColumn + i;
-      addReader(i, outputColumn);
+      throw new RuntimeException("Category " + sourceCategory.name() + " not supported");
     }
-  }
 
-  public void init() throws HiveException {
-    init(0);
+    /*
+     * Convert our source object we just read into the target object and store that in the
+     * VectorizedRowBatch.
+     */
+    convertVectorAssignRow.assignConvertRowColumn(batch, batchIndex, logicalColumnIndex,
+        convertSourceWritable);
   }
 
+  /**
+   * Specify the range of bytes to deserialize in the next call to the deserialize method.
+   *
+   * @param bytes
+   * @param offset
+   * @param length
+   */
   public void setBytes(byte[] bytes, int offset, int length) {
     deserializeRead.set(bytes, offset, length);
   }
 
-  public void deserializeByValue(VectorizedRowBatch batch, int batchIndex) throws IOException {
+  /**
+   * Deserialize a row from the range of bytes specified by setBytes.
+   *
+   * @param batch
+   * @param batchIndex
+   * @throws IOException
+   */
+  public void deserialize(VectorizedRowBatch batch, int batchIndex) throws IOException {
+    final int count = isConvert.length;
     int i = 0;
     try {
-      while (i < readersByValue.length) {
-        readersByValue[i].apply(batch, batchIndex);
+      while (i < count) {
+        if (isConvert[i]) {
+          deserializeConvertRowColumn(batch, batchIndex, i);
+        } else {
+          deserializeRowColumn(batch, batchIndex, i);
+        }
         i++;    // Increment after the apply which could throw an exception.
       }
     } catch (EOFException e) {
@@ -723,27 +666,14 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
     deserializeRead.extraFieldsCheck();
   }
 
-  public void deserializeByReference(VectorizedRowBatch batch, int batchIndex) throws IOException {
-    int i = 0;
-    try {
-      while (i < readersByReference.length) {
-      readersByReference[i].apply(batch, batchIndex);
-      i++;    // Increment after the apply which could throw an exception.
-    }
-    } catch (EOFException e) {
-      throwMoreDetailedException(e, i);
-    }
-    deserializeRead.extraFieldsCheck();
-  }
-
   private void throwMoreDetailedException(IOException e, int index) throws EOFException {
     StringBuilder sb = new StringBuilder();
-    sb.append("Detail: \"" + e.toString() + "\" occured for field " + index + " of " +  typeInfos.length + " fields (");
-    for (int i = 0; i < typeInfos.length; i++) {
+    sb.append("Detail: \"" + e.toString() + "\" occured for field " + index + " of " +  sourceTypeInfos.length + " fields (");
+    for (int i = 0; i < sourceTypeInfos.length; i++) {
       if (i > 0) {
         sb.append(", ");
       }
-      sb.append(((PrimitiveTypeInfo) typeInfos[i]).getPrimitiveCategory().name());
+      sb.append(((PrimitiveTypeInfo) sourceTypeInfos[i]).getPrimitiveCategory().name());
     }
     sb.append(")");
     throw new EOFException(sb.toString());

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
index e883f38..c965dc8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
@@ -18,758 +18,329 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
-import java.io.IOException;
-import java.sql.Date;
-import java.sql.Timestamp;
 import java.util.List;
 
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.common.type.HiveChar;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
-import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
-import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.HiveIntervalDayTimeWritable;
+import org.apache.hadoop.hive.serde2.io.HiveIntervalYearMonthWritable;
+import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableHiveCharObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableHiveDecimalObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableHiveVarcharObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
+import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hive.common.util.DateUtils;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Charsets;
 
 /**
- * This class extracts specified VectorizedRowBatch row columns into a Writable row Object[].
- *
- * The caller provides the hive type names and target column numbers in the order desired to
- * extract from the Writable row Object[].
+ * This class extracts specified VectorizedRowBatch row columns into writables.
  *
- * This class is abstract to allow the subclasses to control batch reuse.
+ * The caller provides the data types and projection column numbers of a subset of the columns
+ * to extract.
  */
-public abstract class VectorExtractRow {
+public class VectorExtractRow {
+
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LoggerFactory.getLogger(VectorExtractRow.class);
 
-  private boolean tolerateNullColumns;
-
-  public VectorExtractRow() {
-    // UNDONE: For now allow null columns until vector_decimal_mapjoin.q is understood...
-    tolerateNullColumns = true;
-  }
-
-  protected abstract class Extractor {
-    protected int columnIndex;
-    protected Object object;
-
-    public Extractor(int columnIndex) {
-      this.columnIndex = columnIndex;
-    }
-
-    public int getColumnIndex() {
-      return columnIndex;
-    }
-
-    abstract void setColumnVector(VectorizedRowBatch batch);
-
-    abstract void forgetColumnVector();
-
-    abstract Object extract(int batchIndex);
-  }
-
-  private class VoidExtractor extends Extractor {
-
-    VoidExtractor(int columnIndex) {
-      super(columnIndex);
-    }
-
-    @Override
-    void setColumnVector(VectorizedRowBatch batch) {
-    }
-
-    @Override
-    void forgetColumnVector() {
-    }
-
-    @Override
-    Object extract(int batchIndex) {
-      return null;
-    }
-  }
-
-  private abstract class AbstractLongExtractor extends Extractor {
-
-    protected LongColumnVector colVector;
-    protected long[] vector;
-
-    AbstractLongExtractor(int columnIndex) {
-      super(columnIndex);
-    }
-
-    @Override
-    void setColumnVector(VectorizedRowBatch batch) {
-      colVector = (LongColumnVector) batch.cols[columnIndex];
-      vector = colVector.vector;
-    }
-
-    @Override
-    void forgetColumnVector() {
-      colVector = null;
-      vector = null;
-    }
-  }
-
-  protected class BooleanExtractor extends AbstractLongExtractor {
-
-    BooleanExtractor(int columnIndex) {
-      super(columnIndex);
-      object = PrimitiveObjectInspectorFactory.writableBooleanObjectInspector.create(false);
-    }
-
-    @Override
-    Object extract(int batchIndex) {
-      int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex);
-      if (colVector.noNulls || !colVector.isNull[adjustedIndex]) {
-        long value = vector[adjustedIndex];
-        PrimitiveObjectInspectorFactory.writableBooleanObjectInspector.set(object, value == 0 ? false : true);
-        return object;
-      } else {
-        return null;
-      }
-    }
-  }
-
-  protected class ByteExtractor extends AbstractLongExtractor {
-
-    ByteExtractor(int columnIndex) {
-      super(columnIndex);
-      object = PrimitiveObjectInspectorFactory.writableByteObjectInspector.create((byte) 0);
-    }
-
-    @Override
-    Object extract(int batchIndex) {
-      int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex);
-      if (colVector.noNulls || !colVector.isNull[adjustedIndex]) {
-        long value = vector[adjustedIndex];
-        PrimitiveObjectInspectorFactory.writableByteObjectInspector.set(object, (byte) value);
-        return object;
-      } else {
-        return null;
+  /*
+   * These members have information for extracting a row column objects from VectorizedRowBatch
+   * columns.
+   */
+  int[] projectionColumnNums;
+              // Extraction can be a subset of columns, so this is the projection --
+              // the batch column numbers.
+
+  Category[] categories;
+              // The data type category of each column being extracted.
+
+  PrimitiveCategory[] primitiveCategories;
+              // The data type primitive category of each column being assigned.
+
+  int[] maxLengths;
+              // For the CHAR and VARCHAR data types, the maximum character length of
+              // the columns.  Otherwise, 0.
+
+  Writable[] primitiveWritables;
+            // The extracted values will be placed in these writables.
+
+  /*
+   * Allocate the various arrays.
+   */
+  private void allocateArrays(int count) {
+    projectionColumnNums = new int[count];
+    categories = new Category[count];
+    primitiveCategories = new PrimitiveCategory[count];
+    maxLengths = new int[count];
+    primitiveWritables = new Writable[count];
+  }
+
+  /*
+   * Initialize one column's array entries.
+   */
+  private void initEntry(int logicalColumnIndex, int projectionColumnNum, TypeInfo typeInfo) {
+    projectionColumnNums[logicalColumnIndex] = projectionColumnNum;
+    Category category = typeInfo.getCategory();
+    categories[logicalColumnIndex] = category;
+    if (category == Category.PRIMITIVE) {
+      PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
+      PrimitiveCategory primitiveCategory = primitiveTypeInfo.getPrimitiveCategory();
+      primitiveCategories[logicalColumnIndex] = primitiveCategory;
+
+      switch (primitiveCategory) {
+      case CHAR:
+        maxLengths[logicalColumnIndex] = ((CharTypeInfo) primitiveTypeInfo).getLength();
+        break;
+      case VARCHAR:
+        maxLengths[logicalColumnIndex] = ((VarcharTypeInfo) primitiveTypeInfo).getLength();
+        break;
+      default:
+        // No additional data type specific setting.
+        break;
       }
-    }
-  }
-
-  private class ShortExtractor extends AbstractLongExtractor {
-
-    ShortExtractor(int columnIndex) {
-      super(columnIndex);
-      object = PrimitiveObjectInspectorFactory.writableShortObjectInspector.create((short) 0);
-    }
 
-    @Override
-    Object extract(int batchIndex) {
-      int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex);
-      if (colVector.noNulls || !colVector.isNull[adjustedIndex]) {
-        long value = vector[adjustedIndex];
-        PrimitiveObjectInspectorFactory.writableShortObjectInspector.set(object, (short) value);
-        return object;
-      } else {
-        return null;
-      }
+      primitiveWritables[logicalColumnIndex] =
+          VectorizedBatchUtil.getPrimitiveWritable(primitiveCategory);
     }
   }
 
-  private class IntExtractor extends AbstractLongExtractor {
-
-    IntExtractor(int columnIndex) {
-      super(columnIndex);
-      object = PrimitiveObjectInspectorFactory.writableIntObjectInspector.create(0);
-    }
-
-    @Override
-    Object extract(int batchIndex) {
-      int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex);
-      if (colVector.noNulls || !colVector.isNull[adjustedIndex]) {
-        long value = vector[adjustedIndex];
-        PrimitiveObjectInspectorFactory.writableIntObjectInspector.set(object, (int) value);
-        return object;
-      } else {
-        return null;
-      }
-    }
-  }
-
-  private class LongExtractor extends AbstractLongExtractor {
-
-    LongExtractor(int columnIndex) {
-      super(columnIndex);
-      object = PrimitiveObjectInspectorFactory.writableLongObjectInspector.create(0);
-    }
-
-    @Override
-    Object extract(int batchIndex) {
-      int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex);
-      if (colVector.noNulls || !colVector.isNull[adjustedIndex]) {
-        long value = vector[adjustedIndex];
-        PrimitiveObjectInspectorFactory.writableLongObjectInspector.set(object, value);
-        return object;
-      } else {
-        return null;
-      }
-    }
-  }
-
-  private class DateExtractor extends AbstractLongExtractor {
-
-    private Date date;
-
-    DateExtractor(int columnIndex) {
-      super(columnIndex);
-      object = PrimitiveObjectInspectorFactory.writableDateObjectInspector.create(new Date(0));
-      date = new Date(0);
-    }
-
-    @Override
-    Object extract(int batchIndex) {
-      int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex);
-      if (colVector.noNulls || !colVector.isNull[adjustedIndex]) {
-        long value = vector[adjustedIndex];
-        date.setTime(DateWritable.daysToMillis((int) value));
-        PrimitiveObjectInspectorFactory.writableDateObjectInspector.set(object, date);
-        return object;
-      } else {
-        return null;
-      }
-    }
-  }
-
-  private abstract class AbstractTimestampExtractor extends Extractor {
-
-    protected TimestampColumnVector colVector;
-
-    AbstractTimestampExtractor(int columnIndex) {
-      super(columnIndex);
-    }
-
-    @Override
-    void setColumnVector(VectorizedRowBatch batch) {
-      colVector = (TimestampColumnVector) batch.cols[columnIndex];
-    }
-
-    @Override
-    void forgetColumnVector() {
-      colVector = null;
-    }
-  }
-
-  private class TimestampExtractor extends AbstractTimestampExtractor {
-
-    protected Timestamp timestamp;
-
-    TimestampExtractor(int columnIndex) {
-      super(columnIndex);
-      object = PrimitiveObjectInspectorFactory.writableTimestampObjectInspector.create(new Timestamp(0));
-      timestamp = new Timestamp(0);
-    }
-
-    @Override
-    Object extract(int batchIndex) {
-      int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex);
-      if (colVector.noNulls || !colVector.isNull[adjustedIndex]) {
-        colVector.timestampUpdate(timestamp, adjustedIndex);
-        PrimitiveObjectInspectorFactory.writableTimestampObjectInspector.set(object, timestamp);
-        return object;
-      } else {
-        return null;
-      }
-    }
-  }
-
-  private class IntervalYearMonthExtractor extends AbstractLongExtractor {
-
-    private HiveIntervalYearMonth hiveIntervalYearMonth;
-
-    IntervalYearMonthExtractor(int columnIndex) {
-      super(columnIndex);
-      object = PrimitiveObjectInspectorFactory.writableHiveIntervalYearMonthObjectInspector.create(new HiveIntervalYearMonth(0));
-      hiveIntervalYearMonth = new HiveIntervalYearMonth(0);
-    }
-
-    @Override
-    Object extract(int batchIndex) {
-      int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex);
-      if (colVector.noNulls || !colVector.isNull[adjustedIndex]) {
-        int totalMonths = (int) vector[adjustedIndex];
-        hiveIntervalYearMonth.set(totalMonths);
-        PrimitiveObjectInspectorFactory.writableHiveIntervalYearMonthObjectInspector.set(object, hiveIntervalYearMonth);
-        return object;
-      } else {
-        return null;
-      }
-    }
-  }
-
-  private abstract class AbstractIntervalDayTimeExtractor extends Extractor {
-
-    protected IntervalDayTimeColumnVector colVector;
-
-    AbstractIntervalDayTimeExtractor(int columnIndex) {
-      super(columnIndex);
-    }
-
-    @Override
-    void setColumnVector(VectorizedRowBatch batch) {
-      colVector = (IntervalDayTimeColumnVector) batch.cols[columnIndex];
-    }
+  /*
+   * Initialize using an StructObjectInspector and a column projection list.
+   */
+  public void init(StructObjectInspector structObjectInspector, List<Integer> projectedColumns)
+      throws HiveException {
 
-    @Override
-    void forgetColumnVector() {
-      colVector = null;
-    }
-  }
-
-  private class IntervalDayTimeExtractor extends AbstractIntervalDayTimeExtractor {
-
-    private HiveIntervalDayTime hiveIntervalDayTime;
-
-    IntervalDayTimeExtractor(int columnIndex) {
-      super(columnIndex);
-      object = PrimitiveObjectInspectorFactory.writableHiveIntervalDayTimeObjectInspector.create(new HiveIntervalDayTime(0, 0));
-      hiveIntervalDayTime = new HiveIntervalDayTime(0, 0);
-    }
-
-    @Override
-    Object extract(int batchIndex) {
-      int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex);
-      if (colVector.noNulls || !colVector.isNull[adjustedIndex]) {
-        hiveIntervalDayTime.set(colVector.asScratchIntervalDayTime(adjustedIndex));
-        PrimitiveObjectInspectorFactory.writableHiveIntervalDayTimeObjectInspector.set(object, hiveIntervalDayTime);
-        return object;
-      } else {
-        return null;
-      }
-    }
-  }
-
-  private abstract class AbstractDoubleExtractor extends Extractor {
-
-    protected DoubleColumnVector colVector;
-    protected double[] vector;
-
-    AbstractDoubleExtractor(int columnIndex) {
-      super(columnIndex);
-    }
-
-    @Override
-    void setColumnVector(VectorizedRowBatch batch) {
-      colVector = (DoubleColumnVector) batch.cols[columnIndex];
-      vector = colVector.vector;
-    }
-
-    @Override
-    void forgetColumnVector() {
-      colVector = null;
-      vector = null;
-    }
-  }
-
-  private class FloatExtractor extends AbstractDoubleExtractor {
-
-    FloatExtractor(int columnIndex) {
-      super(columnIndex);
-      object = PrimitiveObjectInspectorFactory.writableFloatObjectInspector.create(0f);
-    }
+    List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
+    final int count = fields.size();
+    allocateArrays(count);
 
-    @Override
-    Object extract(int batchIndex) {
-      int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex);
-      if (colVector.noNulls || !colVector.isNull[adjustedIndex]) {
-        double value = vector[adjustedIndex];
-        PrimitiveObjectInspectorFactory.writableFloatObjectInspector.set(object, (float) value);
-        return object;
-      } else {
-        return null;
-      }
-    }
-  }
+    for (int i = 0; i < count; i++) {
 
-  private class DoubleExtractor extends AbstractDoubleExtractor {
+      int projectionColumnNum = projectedColumns.get(i);
 
-    DoubleExtractor(int columnIndex) {
-      super(columnIndex);
-      object = PrimitiveObjectInspectorFactory.writableDoubleObjectInspector.create(0f);
-    }
+      StructField field = fields.get(i);
+      ObjectInspector fieldInspector = field.getFieldObjectInspector();
+      TypeInfo typeInfo =
+          TypeInfoUtils.getTypeInfoFromTypeString(fieldInspector.getTypeName());
 
-    @Override
-    Object extract(int batchIndex) {
-      int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex);
-      if (colVector.noNulls || !colVector.isNull[adjustedIndex]) {
-        double value = vector[adjustedIndex];
-        PrimitiveObjectInspectorFactory.writableDoubleObjectInspector.set(object, value);
-        return object;
-      } else {
-        return null;
-      }
+      initEntry(i, projectionColumnNum, typeInfo);
     }
   }
 
-  private abstract class AbstractBytesExtractor extends Extractor {
+  /*
+   * Initialize using data type names.
+   * No projection -- the column range 0 .. types.size()-1
+   */
+  public void init(List<String> typeNames) throws HiveException {
 
-    protected BytesColumnVector colVector;
+    final int count = typeNames.size();
+    allocateArrays(count);
 
-    AbstractBytesExtractor(int columnIndex) {
-      super(columnIndex);
-    }
+    for (int i = 0; i < count; i++) {
 
-    @Override
-    void setColumnVector(VectorizedRowBatch batch) {
-      colVector = (BytesColumnVector) batch.cols[columnIndex];
-    }
+      TypeInfo typeInfo =
+          TypeInfoUtils.getTypeInfoFromTypeString(typeNames.get(i));
 
-    @Override
-    void forgetColumnVector() {
-      colVector = null;
+      initEntry(i, i, typeInfo);
     }
   }
 
-  private class BinaryExtractorByValue extends AbstractBytesExtractor {
-
-    private DataOutputBuffer buffer;
-
-    // Use the BytesWritable instance here as a reference to data saved in buffer.  We do not
-    // want to pass the binary object inspector a byte[] since we would need to allocate it on the
-    // heap each time to get the length correct.
-    private BytesWritable bytesWritable;
-
-    BinaryExtractorByValue(int columnIndex) {
-      super(columnIndex);
-      object = PrimitiveObjectInspectorFactory.writableBinaryObjectInspector.create(ArrayUtils.EMPTY_BYTE_ARRAY);
-      buffer = new DataOutputBuffer();
-      bytesWritable = new BytesWritable();
-    }
-
-    @Override
-    Object extract(int batchIndex) {
-      int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex);
-      if (colVector.noNulls || !colVector.isNull[adjustedIndex]) {
-        byte[] bytes = colVector.vector[adjustedIndex];
-        int start = colVector.start[adjustedIndex];
-        int length = colVector.length[adjustedIndex];
-
-        // Save a copy of the binary data.
-        buffer.reset();
-        try {
-          buffer.write(bytes, start, length);
-        } catch (IOException ioe) {
-          throw new IllegalStateException("bad write", ioe);
-        }
-
-        bytesWritable.set(buffer.getData(), 0, buffer.getLength());
-        PrimitiveObjectInspectorFactory.writableBinaryObjectInspector.set(object, bytesWritable);
-        return object;
-      } else {
-        return null;
-      }
+  public int getCount() {
+    return projectionColumnNums.length;
+  }
+
+  /**
+   * Extract a row's column object from the ColumnVector at batchIndex in the VectorizedRowBatch.
+   *
+   * @param batch
+   * @param batchIndex
+   * @param logicalColumnIndex
+   * @return
+   */
+  public Object extractRowColumn(VectorizedRowBatch batch, int batchIndex, int logicalColumnIndex) {
+    final int projectionColumnNum = projectionColumnNums[logicalColumnIndex];
+    ColumnVector colVector = batch.cols[projectionColumnNum];
+    if (colVector == null) {
+      // In rare cases, the planner will not include columns for reading but other parts of
+      // execution will ask for but not use them..
+      return null;
     }
-  }
-
-  private class StringExtractorByValue extends AbstractBytesExtractor {
-
-    // Use org.apache.hadoop.io.Text as our helper to go from byte[] to String.
-    private Text text;
-
-    StringExtractorByValue(int columnIndex) {
-      super(columnIndex);
-      object = PrimitiveObjectInspectorFactory.writableStringObjectInspector.create(StringUtils.EMPTY);
-      text = new Text();
+    int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex);
+    if (!colVector.noNulls && colVector.isNull[adjustedIndex]) {
+      return null;
     }
 
-    @Override
-    Object extract(int batchIndex) {
-      int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex);
-      if (colVector.noNulls || !colVector.isNull[adjustedIndex]) {
-        byte[] value = colVector.vector[adjustedIndex];
-        int start = colVector.start[adjustedIndex];
-        int length = colVector.length[adjustedIndex];
-
-        if (value == null) {
-          LOG.info("null string entry: batchIndex " + batchIndex + " columnIndex " + columnIndex);
+    Category category = categories[logicalColumnIndex];
+    switch (category) {
+    case PRIMITIVE:
+      {
+        Writable primitiveWritable =
+            primitiveWritables[logicalColumnIndex];
+        PrimitiveCategory primitiveCategory = primitiveCategories[logicalColumnIndex];
+        switch (primitiveCategory) {
+        case VOID:
+          return null;
+        case BOOLEAN:
+          ((BooleanWritable) primitiveWritable).set(
+              ((LongColumnVector) batch.cols[projectionColumnNum]).vector[adjustedIndex] == 0 ?
+                  false : true);
+          return primitiveWritable;
+        case BYTE:
+          ((ByteWritable) primitiveWritable).set(
+              (byte) ((LongColumnVector) batch.cols[projectionColumnNum]).vector[adjustedIndex]);
+          return primitiveWritable;
+        case SHORT:
+          ((ShortWritable) primitiveWritable).set(
+              (short) ((LongColumnVector) batch.cols[projectionColumnNum]).vector[adjustedIndex]);
+          return primitiveWritable;
+        case INT:
+          ((IntWritable) primitiveWritable).set(
+              (int) ((LongColumnVector) batch.cols[projectionColumnNum]).vector[adjustedIndex]);
+          return primitiveWritable;
+        case LONG:
+          ((LongWritable) primitiveWritable).set(
+              ((LongColumnVector) batch.cols[projectionColumnNum]).vector[adjustedIndex]);
+          return primitiveWritable;
+        case TIMESTAMP:
+          ((TimestampWritable) primitiveWritable).set(
+              ((TimestampColumnVector) batch.cols[projectionColumnNum]).asScratchTimestamp(adjustedIndex));
+          return primitiveWritable;
+        case DATE:
+          ((DateWritable) primitiveWritable).set(
+              (int) ((LongColumnVector) batch.cols[projectionColumnNum]).vector[adjustedIndex]);
+          return primitiveWritable;
+        case FLOAT:
+          ((FloatWritable) primitiveWritable).set(
+              (float) ((DoubleColumnVector) batch.cols[projectionColumnNum]).vector[adjustedIndex]);
+          return primitiveWritable;
+        case DOUBLE:
+          ((DoubleWritable) primitiveWritable).set(
+              ((DoubleColumnVector) batch.cols[projectionColumnNum]).vector[adjustedIndex]);
+          return primitiveWritable;
+        case BINARY:
+          {
+            BytesColumnVector bytesColVector =
+                ((BytesColumnVector) batch.cols[projectionColumnNum]);
+            byte[] bytes = bytesColVector.vector[adjustedIndex];
+            int start = bytesColVector.start[adjustedIndex];
+            int length = bytesColVector.length[adjustedIndex];
+
+            if (bytes == null) {
+              LOG.info("null binary entry: batchIndex " + batchIndex + " projection column num " + projectionColumnNum);
+            }
+
+            BytesWritable bytesWritable = (BytesWritable) primitiveWritable;
+            bytesWritable.set(bytes, start, length);
+            return primitiveWritable;
+          }
+        case STRING:
+          {
+            BytesColumnVector bytesColVector =
+                ((BytesColumnVector) batch.cols[projectionColumnNum]);
+            byte[] bytes = bytesColVector.vector[adjustedIndex];
+            int start = bytesColVector.start[adjustedIndex];
+            int length = bytesColVector.length[adjustedIndex];
+
+            if (bytes == null) {
+              LOG.info("null string entry: batchIndex " + batchIndex + " projection column num " + projectionColumnNum);
+            }
+
+            // Use org.apache.hadoop.io.Text as our helper to go from byte[] to String.
+            ((Text) primitiveWritable).set(bytes, start, length);
+            return primitiveWritable;
+          }
+        case VARCHAR:
+          {
+            BytesColumnVector bytesColVector =
+                ((BytesColumnVector) batch.cols[projectionColumnNum]);
+            byte[] bytes = bytesColVector.vector[adjustedIndex];
+            int start = bytesColVector.start[adjustedIndex];
+            int length = bytesColVector.length[adjustedIndex];
+
+            if (bytes == null) {
+              LOG.info("null varchar entry: batchIndex " + batchIndex + " projection column num " + projectionColumnNum);
+            }
+
+            int adjustedLength = StringExpr.truncate(bytes, start, length,
+                maxLengths[logicalColumnIndex]);
+
+            HiveVarcharWritable hiveVarcharWritable = (HiveVarcharWritable) primitiveWritable;
+            hiveVarcharWritable.set(new String(bytes, start, adjustedLength, Charsets.UTF_8), -1);
+            return primitiveWritable;
+          }
+        case CHAR:
+          {
+            BytesColumnVector bytesColVector =
+                ((BytesColumnVector) batch.cols[projectionColumnNum]);
+            byte[] bytes = bytesColVector.vector[adjustedIndex];
+            int start = bytesColVector.start[adjustedIndex];
+            int length = bytesColVector.length[adjustedIndex];
+
+            if (bytes == null) {
+              LOG.info("null char entry: batchIndex " + batchIndex + " projection column num " + projectionColumnNum);
+            }
+
+            int adjustedLength = StringExpr.rightTrimAndTruncate(bytes, start, length,
+                maxLengths[logicalColumnIndex]);
+
+            HiveCharWritable hiveCharWritable = (HiveCharWritable) primitiveWritable;
+            hiveCharWritable.set(new String(bytes, start, adjustedLength, Charsets.UTF_8), -1);
+            return primitiveWritable;
+          }
+        case DECIMAL:
+          ((HiveDecimalWritable) primitiveWritable).set(
+              ((DecimalColumnVector) batch.cols[projectionColumnNum]).vector[adjustedIndex].getHiveDecimal());
+          return primitiveWritable;
+        case INTERVAL_YEAR_MONTH:
+          ((HiveIntervalYearMonthWritable) primitiveWritable).set(
+              (int) ((LongColumnVector) batch.cols[projectionColumnNum]).vector[adjustedIndex]);
+          return primitiveWritable;
+        case INTERVAL_DAY_TIME:
+          ((HiveIntervalDayTimeWritable) primitiveWritable).set(
+              ((IntervalDayTimeColumnVector) batch.cols[projectionColumnNum]).asScratchIntervalDayTime(adjustedIndex));
+          return primitiveWritable;
+        default:
+          throw new RuntimeException("Primitive category " + primitiveCategory.name() +
+              " not supported");
         }
-        // Use org.apache.hadoop.io.Text as our helper to go from byte[] to String.
-        text.set(value, start, length);
-
-        PrimitiveObjectInspectorFactory.writableStringObjectInspector.set(object, text);
-        return object;
-      } else {
-        return null;
-      }
-    }
-  }
-
-  private class VarCharExtractorByValue extends AbstractBytesExtractor {
-
-    // We need our own instance of the VARCHAR object inspector to hold the maximum length
-    // from the TypeInfo.
-    private WritableHiveVarcharObjectInspector writableVarcharObjectInspector;
-
-    // Use org.apache.hadoop.io.Text as our helper to go from byte[] to String.
-    private Text text;
-
-    /*
-     * @param varcharTypeInfo
-     *                      We need the VARCHAR type information that contains the maximum length.
-     * @param columnIndex
-     *                      The vector row batch column that contains the bytes for the VARCHAR.
-     */
-    VarCharExtractorByValue(VarcharTypeInfo varcharTypeInfo, int columnIndex) {
-      super(columnIndex);
-      writableVarcharObjectInspector = new WritableHiveVarcharObjectInspector(varcharTypeInfo);
-      object = writableVarcharObjectInspector.create(new HiveVarchar(StringUtils.EMPTY, -1));
-      text = new Text();
-    }
-
-    @Override
-    Object extract(int batchIndex) {
-      int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex);
-      if (colVector.noNulls || !colVector.isNull[adjustedIndex]) {
-        byte[] value = colVector.vector[adjustedIndex];
-        int start = colVector.start[adjustedIndex];
-        int length = colVector.length[adjustedIndex];
-
-        // Use org.apache.hadoop.io.Text as our helper to go from byte[] to String.
-        text.set(value, start, length);
-
-        writableVarcharObjectInspector.set(object, text.toString());
-        return object;
-      } else {
-        return null;
-      }
-    }
-  }
-
-  private class CharExtractorByValue extends AbstractBytesExtractor {
-
-    // We need our own instance of the CHAR object inspector to hold the maximum length
-    // from the TypeInfo.
-    private WritableHiveCharObjectInspector writableCharObjectInspector;
-
-    // Use org.apache.hadoop.io.Text as our helper to go from byte[] to String.
-    private Text text;
-
-    /*
-     * @param varcharTypeInfo
-     *                      We need the CHAR type information that contains the maximum length.
-     * @param columnIndex
-     *                      The vector row batch column that contains the bytes for the CHAR.
-     */
-    CharExtractorByValue(CharTypeInfo charTypeInfo, int columnIndex) {
-      super(columnIndex);
-      writableCharObjectInspector = new WritableHiveCharObjectInspector(charTypeInfo);
-      object = writableCharObjectInspector.create(new HiveChar(StringUtils.EMPTY, -1));
-      text = new Text();
-    }
-
-    @Override
-    Object extract(int batchIndex) {
-      int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex);
-      if (colVector.noNulls || !colVector.isNull[adjustedIndex]) {
-        byte[] value = colVector.vector[adjustedIndex];
-        int start = colVector.start[adjustedIndex];
-        int length = colVector.length[adjustedIndex];
-
-        // Use org.apache.hadoop.io.Text as our helper to go from byte[] to String.
-        text.set(value, start, length);
-
-        writableCharObjectInspector.set(object, text.toString());
-        return object;
-      } else {
-        return null;
-      }
-    }
-  }
-
-  private class DecimalExtractor extends Extractor {
-
-    private WritableHiveDecimalObjectInspector writableDecimalObjectInspector;
-    protected DecimalColumnVector colVector;
-
-    /*
-     * @param decimalTypeInfo
-     *                      We need the DECIMAL type information that contains scale and precision.
-     * @param columnIndex
-     *                      The vector row batch column that contains the bytes for the VARCHAR.
-     */
-    DecimalExtractor(DecimalTypeInfo decimalTypeInfo, int columnIndex) {
-      super(columnIndex);
-      writableDecimalObjectInspector = new WritableHiveDecimalObjectInspector(decimalTypeInfo);
-      object = writableDecimalObjectInspector.create(HiveDecimal.ZERO);
-    }
-
-    @Override
-    void setColumnVector(VectorizedRowBatch batch) {
-      colVector = (DecimalColumnVector) batch.cols[columnIndex];
-    }
-
-    @Override
-    void forgetColumnVector() {
-      colVector = null;
-    }
-
-    @Override
-    Object extract(int batchIndex) {
-      int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex);
-      if (colVector.noNulls || !colVector.isNull[adjustedIndex]) {
-        HiveDecimal value = colVector.vector[adjustedIndex].getHiveDecimal();
-        writableDecimalObjectInspector.set(object, value);
-        return object;
-      } else {
-        return null;
       }
-    }
-  }
-
-  private Extractor createExtractor(PrimitiveTypeInfo primitiveTypeInfo, int columnIndex) throws HiveException {
-    PrimitiveCategory primitiveCategory = primitiveTypeInfo.getPrimitiveCategory();
-    Extractor extracter;
-    switch (primitiveCategory) {
-    case VOID:
-      extracter = new VoidExtractor(columnIndex);
-      break;
-    case BOOLEAN:
-      extracter = new BooleanExtractor(columnIndex);
-      break;
-    case BYTE:
-      extracter = new ByteExtractor(columnIndex);
-      break;
-    case SHORT:
-      extracter = new ShortExtractor(columnIndex);
-      break;
-    case INT:
-      extracter = new IntExtractor(columnIndex);
-      break;
-    case LONG:
-      extracter = new LongExtractor(columnIndex);
-      break;
-    case TIMESTAMP:
-      extracter = new TimestampExtractor(columnIndex);
-      break;
-    case DATE:
-      extracter = new DateExtractor(columnIndex);
-      break;
-    case FLOAT:
-      extracter = new FloatExtractor(columnIndex);
-      break;
-    case DOUBLE:
-      extracter = new DoubleExtractor(columnIndex);
-      break;
-    case BINARY:
-      extracter = new BinaryExtractorByValue(columnIndex);
-      break;
-    case STRING:
-      extracter = new StringExtractorByValue(columnIndex);
-      break;
-    case VARCHAR:
-      extracter = new VarCharExtractorByValue((VarcharTypeInfo) primitiveTypeInfo, columnIndex);
-      break;
-    case CHAR:
-      extracter = new CharExtractorByValue((CharTypeInfo) primitiveTypeInfo, columnIndex);
-      break;
-    case DECIMAL:
-      extracter = new DecimalExtractor((DecimalTypeInfo) primitiveTypeInfo, columnIndex);
-      break;
-    case INTERVAL_YEAR_MONTH:
-      extracter = new IntervalYearMonthExtractor(columnIndex);
-      break;
-    case INTERVAL_DAY_TIME:
-      extracter = new IntervalDayTimeExtractor(columnIndex);
-      break;
     default:
-      throw new HiveException("No vector row extracter for primitive category " +
-          primitiveCategory);
+      throw new RuntimeException("Category " + category.name() + " not supported");
     }
-    return extracter;
-  }
-
-  Extractor[] extracters;
-
-  public void init(StructObjectInspector structObjectInspector, List<Integer> projectedColumns) throws HiveException {
-
-    extracters = new Extractor[projectedColumns.size()];
-
-    List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
-
-    int i = 0;
-    for (StructField field : fields) {
-      int columnIndex = projectedColumns.get(i);
-      ObjectInspector fieldInspector = field.getFieldObjectInspector();
-      PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) TypeInfoUtils.getTypeInfoFromTypeString(
-          fieldInspector.getTypeName());
-      extracters[i] = createExtractor(primitiveTypeInfo, columnIndex);
-      i++;
-    }
-  }
-
-  public void init(List<String> typeNames) throws HiveException {
-
-    extracters = new Extractor[typeNames.size()];
-
-    int i = 0;
-    for (String typeName : typeNames) {
-      PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) TypeInfoUtils.getTypeInfoFromTypeString(typeName);
-      extracters[i] = createExtractor(primitiveTypeInfo, i);
-      i++;
-    }
-  }
-
-  public int getCount() {
-    return extracters.length;
-  }
-
-  protected void setBatch(VectorizedRowBatch batch) throws HiveException {
-
-    for (int i = 0; i < extracters.length; i++) {
-      Extractor extracter = extracters[i];
-      int columnIndex = extracter.getColumnIndex();
-      if (batch.cols[columnIndex] == null) {
-        if (tolerateNullColumns) {
-          // Replace with void...
-          extracter = new VoidExtractor(columnIndex);
-          extracters[i] = extracter;
-        } else {
-          throw new HiveException("Unexpected null vector column " + columnIndex);
-        }
-      }
-      extracter.setColumnVector(batch);
-    }
-  }
-
-  protected void forgetBatch() {
-    for (Extractor extracter : extracters) {
-      extracter.forgetColumnVector();
-    }
-  }
-
-  public Object extractRowColumn(int batchIndex, int logicalColumnIndex) {
-    return extracters[logicalColumnIndex].extract(batchIndex);
   }
 
-  public void extractRow(int batchIndex, Object[] objects) {
-    for (int i = 0; i < extracters.length; i++) {
-      Extractor extracter = extracters[i];
-      objects[i] = extracter.extract(batchIndex);
+  /**
+   * Extract an row object from a VectorizedRowBatch at batchIndex.
+   *
+   * @param batch
+   * @param batchIndex
+   * @param objects
+   */
+  public void extractRow(VectorizedRowBatch batch, int batchIndex, Object[] objects) {
+    for (int i = 0; i < projectionColumnNums.length; i++) {
+      objects[i] = extractRowColumn(batch, batchIndex, i);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRowDynBatch.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRowDynBatch.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRowDynBatch.java
deleted file mode 100644
index 0ff7145..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRowDynBatch.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.vector;
-
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-
-/**
- * This class extracts specified VectorizedRowBatch row columns into a Writable row Object[].
- *
- * The caller provides the hive type names and target column numbers in the order desired to
- * extract from the Writable row Object[].
- *
- * This class is for use when the batch being assigned is always the same.
- */
-public class VectorExtractRowDynBatch extends VectorExtractRow {
-
-  public void setBatchOnEntry(VectorizedRowBatch batch) throws HiveException {
-    setBatch(batch);
-  }
-
-  public void forgetBatchOnExit() {
-    forgetBatch();
-  }
-}
\ No newline at end of file