You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2016/05/03 01:59:48 UTC

[37/40] hive git commit: HIVE-12878: Support Vectorization for TEXTFILE and other formats (Matt McCline, reviewed by Sergey Shelukhin)

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRowSameBatch.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRowSameBatch.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRowSameBatch.java
deleted file mode 100644
index faec0aa..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRowSameBatch.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.vector;
-
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-
-/**
- * This class extracts specified VectorizedRowBatch row columns into a Writable row Object[].
- *
- * The caller provides the hive type names and target column numbers in the order desired to
- * extract from the Writable row Object[].
- *
- * This class is for use when the batch being assigned is always the same.
- */
-public class VectorExtractRowSameBatch extends VectorExtractRow {
-
-  public void setOneBatch(VectorizedRowBatch batch) throws HiveException {
-    setBatch(batch);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
index a3082c3..ff88b85 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
@@ -43,7 +43,7 @@ public class VectorFileSinkOperator extends FileSinkOperator {
 
   private transient boolean firstBatch;
 
-  private transient VectorExtractRowDynBatch vectorExtractRowDynBatch;
+  private transient VectorExtractRow vectorExtractRow;
 
   protected transient Object[] singleRow;
 
@@ -80,30 +80,26 @@ public class VectorFileSinkOperator extends FileSinkOperator {
   public void process(Object data, int tag) throws HiveException {
     VectorizedRowBatch batch = (VectorizedRowBatch) data;
     if (firstBatch) {
-      vectorExtractRowDynBatch = new VectorExtractRowDynBatch();
-      vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns());
+      vectorExtractRow = new VectorExtractRow();
+      vectorExtractRow.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns());
 
-      singleRow = new Object[vectorExtractRowDynBatch.getCount()];
+      singleRow = new Object[vectorExtractRow.getCount()];
 
       firstBatch = false;
     }
 
-    vectorExtractRowDynBatch.setBatchOnEntry(batch);
-
     if (batch.selectedInUse) {
       int selected[] = batch.selected;
       for (int logical = 0 ; logical < batch.size; logical++) {
         int batchIndex = selected[logical];
-        vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+        vectorExtractRow.extractRow(batch, batchIndex, singleRow);
         super.process(singleRow, tag);
       }
     } else {
       for (int batchIndex = 0 ; batchIndex < batch.size; batchIndex++) {
-        vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+        vectorExtractRow.extractRow(batch, batchIndex, singleRow);
         super.process(singleRow, tag);
       }
     }
-
-    vectorExtractRowDynBatch.forgetBatchOnExit();
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index f20f614..98a9bf6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -103,7 +103,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
   private transient VectorizedRowBatch outputBatch;
   private transient VectorizedRowBatchCtx vrbCtx;
 
-  private transient VectorAssignRowSameBatch vectorAssignRowSameBatch;
+  private transient VectorAssignRow vectorAssignRow;
 
   private transient int numEntriesHashTable;
 
@@ -823,9 +823,8 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
         vrbCtx = new VectorizedRowBatchCtx();
         vrbCtx.init((StructObjectInspector) outputObjInspector, vOutContext.getScratchColumnTypeNames());
         outputBatch = vrbCtx.createVectorizedRowBatch();
-        vectorAssignRowSameBatch = new VectorAssignRowSameBatch();
-        vectorAssignRowSameBatch.init((StructObjectInspector) outputObjInspector, vOutContext.getProjectedColumns());
-        vectorAssignRowSameBatch.setOneBatch(outputBatch);
+        vectorAssignRow = new VectorAssignRow();
+        vectorAssignRow.init((StructObjectInspector) outputObjInspector, vOutContext.getProjectedColumns());
       }
 
     } catch (HiveException he) {
@@ -912,11 +911,11 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
     } else {
       // Output keys and aggregates into the output batch.
       for (int i = 0; i < outputKeyLength; ++i) {
-        vectorAssignRowSameBatch.assignRowColumn(outputBatch.size, fi++,
+        vectorAssignRow.assignRowColumn(outputBatch, outputBatch.size, fi++,
                 keyWrappersBatch.getWritableKeyValue (kw, i, keyOutputWriters[i]));
       }
       for (int i = 0; i < aggregators.length; ++i) {
-        vectorAssignRowSameBatch.assignRowColumn(outputBatch.size, fi++,
+        vectorAssignRow.assignRowColumn(outputBatch, outputBatch.size, fi++,
                 aggregators[i].evaluateOutput(agg.getAggregationBuffer(i)));
       }
       ++outputBatch.size;
@@ -937,7 +936,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
       throws HiveException {
     int fi = outputKeyLength;   // Start after group keys.
     for (int i = 0; i < aggregators.length; ++i) {
-      vectorAssignRowSameBatch.assignRowColumn(outputBatch.size, fi++,
+      vectorAssignRow.assignRowColumn(outputBatch, outputBatch.size, fi++,
               aggregators[i].evaluateOutput(agg.getAggregationBuffer(i)));
     }
     ++outputBatch.size;

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
index 6bed52f..902a183 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
@@ -59,7 +59,7 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector
   protected transient VectorizedRowBatch outputBatch;
   protected transient VectorizedRowBatch scratchBatch;  // holds restored (from disk) big table rows
 
-  protected transient Map<ObjectInspector, VectorAssignRowSameBatch> outputVectorAssignRowMap;
+  protected transient Map<ObjectInspector, VectorAssignRow> outputVectorAssignRowMap;
 
   protected transient VectorizedRowBatchCtx vrbCtx = null;
 
@@ -100,7 +100,7 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector
 
     outputBatch = vrbCtx.createVectorizedRowBatch();
 
-    outputVectorAssignRowMap = new HashMap<ObjectInspector, VectorAssignRowSameBatch>();
+    outputVectorAssignRowMap = new HashMap<ObjectInspector, VectorAssignRow>();
   }
 
   /**
@@ -109,15 +109,14 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector
   @Override
   protected void internalForward(Object row, ObjectInspector outputOI) throws HiveException {
     Object[] values = (Object[]) row;
-    VectorAssignRowSameBatch va = outputVectorAssignRowMap.get(outputOI);
+    VectorAssignRow va = outputVectorAssignRowMap.get(outputOI);
     if (va == null) {
-      va = new VectorAssignRowSameBatch();
+      va = new VectorAssignRow();
       va.init((StructObjectInspector) outputOI, vOutContext.getProjectedColumns());
-      va.setOneBatch(outputBatch);
       outputVectorAssignRowMap.put(outputOI, va);
     }
 
-    va.assignRow(outputBatch.size, values);
+    va.assignRow(outputBatch, outputBatch.size, values);
 
     ++outputBatch.size;
     if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) {

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
index e8f4471..3323df3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
@@ -181,7 +181,9 @@ public class VectorMapJoinOperator extends VectorMapJoinBaseOperator {
     joinValues[posBigTable] = vectorNodeEvaluators;
 
     // Filtering is handled in the input batch processing
-    filterMaps[posBigTable] = null;
+    if (filterMaps != null) {
+      filterMaps[posBigTable] = null;
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java
index 0fe1188..22bebb0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java
@@ -44,7 +44,7 @@ public class VectorMapJoinOuterFilteredOperator extends VectorMapJoinBaseOperato
 
   private transient boolean firstBatch;
 
-  private transient VectorExtractRowDynBatch vectorExtractRowDynBatch;
+  private transient VectorExtractRow vectorExtractRow;
 
   protected transient Object[] singleRow;
 
@@ -94,33 +94,28 @@ public class VectorMapJoinOuterFilteredOperator extends VectorMapJoinBaseOperato
     }
 
     if (firstBatch) {
-      vectorExtractRowDynBatch = new VectorExtractRowDynBatch();
-      vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns());
+      vectorExtractRow = new VectorExtractRow();
+      vectorExtractRow.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns());
 
-      singleRow = new Object[vectorExtractRowDynBatch.getCount()];
+      singleRow = new Object[vectorExtractRow.getCount()];
 
       firstBatch = false;
     }
 
-
-    vectorExtractRowDynBatch.setBatchOnEntry(batch);
-
     // VectorizedBatchUtil.debugDisplayBatch( batch, "VectorReduceSinkOperator processOp ");
 
     if (batch.selectedInUse) {
       int selected[] = batch.selected;
       for (int logical = 0 ; logical < batch.size; logical++) {
         int batchIndex = selected[logical];
-        vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+        vectorExtractRow.extractRow(batch, batchIndex, singleRow);
         super.process(singleRow, tag);
       }
     } else {
       for (int batchIndex = 0 ; batchIndex < batch.size; batchIndex++) {
-        vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+        vectorExtractRow.extractRow(batch, batchIndex, singleRow);
         super.process(singleRow, tag);
       }
     }
-
-    vectorExtractRowDynBatch.forgetBatchOnExit();
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
index 9f0c24e..6979956 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
@@ -18,20 +18,422 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
-import org.apache.hadoop.hive.ql.exec.MapOperator;
+import org.apache.hadoop.hive.ql.exec.AbstractMapOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc;
+import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorMapOperatorReadType;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.fast.DeserializeRead;
+import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.fast.LazySimpleDeserializeRead;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BinaryComparable;
 import org.apache.hadoop.io.Writable;
 
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
-public class VectorMapOperator extends MapOperator {
+/*
+ *
+ * The vectorized MapOperator.
+ *
+ * There are 3 modes of reading for vectorization:
+ *
+ *   1) One for the Vectorized Input File Format which returns VectorizedRowBatch as the row.
+ *
+ *   2) One for using VectorDeserializeRow to deserialize each row into the VectorizedRowBatch.
+ *      Currently, these Input File Formats:
+ *        TEXTFILE
+ *        SEQUENCEFILE
+ *
+ *   3) And one using the regular partition deserializer to get the row object and assigning
+ *      the row object into the VectorizedRowBatch with VectorAssignRow.
+ *      This picks up Input File Format not supported by the other two.
+ */
+public class VectorMapOperator extends AbstractMapOperator {
 
   private static final long serialVersionUID = 1L;
 
+  /*
+   * Overall information on this vectorized Map operation.
+   */
+  private transient HashMap<String, VectorPartitionContext> fileToPartitionContextMap;
+
+  private transient Operator<? extends OperatorDesc> oneRootOperator;
+
+  private transient TypeInfo tableStructTypeInfo;
+  private transient StandardStructObjectInspector tableStandardStructObjectInspector;
+
+  private transient TypeInfo[] tableRowTypeInfos;
+
+  private transient VectorizedRowBatchCtx batchContext;
+              // The context for creating the VectorizedRowBatch for this Map node that
+              // the Vectorizer class determined.
+
+  /*
+   * A different batch for vectorized Input File Format readers so they can do their work
+   * overlapped with work of the row collection that vector/row deserialization does.  This allows
+   * the partitions to mix modes (e.g. for us to flush the previously batched rows on file change).
+   */
+  private transient VectorizedRowBatch vectorizedInputFileFormatBatch;
+
+  /*
+   * This batch is only used by vector/row deserializer readers.
+   */
+  private transient VectorizedRowBatch deserializerBatch;
+
+  private transient long batchCounter;
+
+  private transient int dataColumnCount;
+  private transient int partitionColumnCount;
+  private transient Object[] partitionValues;
+
+  private transient boolean[] columnsToIncludeTruncated;
+
+  /*
+   * The following members have context information for the current partition file being read.
+   */
+  private transient VectorMapOperatorReadType currentReadType;
+  private transient VectorPartitionContext currentVectorPartContext;
+                  // Current vector map operator read type and context.
+
+  private transient int currentDataColumnCount;
+                  // The number of data columns that the current reader will return.
+                  // Only applicable for vector/row deserialization.
+
+  private transient DeserializeRead currentDeserializeRead;
+  private transient VectorDeserializeRow currentVectorDeserializeRow;
+                  // When we are doing vector deserialization, these are the fast deserializer and
+                  // the vector row deserializer.
+
+  private Deserializer currentPartDeserializer;
+  private StructObjectInspector currentPartRawRowObjectInspector;
+  private VectorAssignRow currentVectorAssign;
+                  // When we are doing row deserialization, these are the regular deserializer,
+                  // partition object inspector, and vector row assigner.
+
+  /*
+   * The abstract context for the 3 kinds of vectorized reading.
+   */
+  protected abstract class VectorPartitionContext {
+
+    protected final PartitionDesc partDesc;
+
+    String tableName;
+    String partName;
+
+    /*
+     * Initialization here is adapted from MapOperator.MapOpCtx.initObjectInspector method.
+     */
+    private VectorPartitionContext(PartitionDesc partDesc) {
+      this.partDesc = partDesc;
+
+      TableDesc td = partDesc.getTableDesc();
+
+      // Use table properties in case of unpartitioned tables,
+      // and the union of table properties and partition properties, with partition
+      // taking precedence, in the case of partitioned tables
+      Properties overlayedProps =
+          SerDeUtils.createOverlayedProperties(td.getProperties(), partDesc.getProperties());
+
+      Map<String, String> partSpec = partDesc.getPartSpec();
+
+      tableName = String.valueOf(overlayedProps.getProperty("name"));
+      partName = String.valueOf(partSpec);
+
+    }
+
+    public PartitionDesc getPartDesc() {
+      return partDesc;
+    }
+
+    /*
+     * Override this for concrete initialization.
+     */
+    public abstract void init(Configuration hconf)
+        throws SerDeException, Exception;
+
+    /*
+     * How many data columns is the partition reader actually supplying?
+     */
+    public abstract int getReaderDataColumnCount();
+  }
+
+  /*
+   * Context for reading a Vectorized Input File Format.
+   */
+  protected class VectorizedInputFileFormatPartitionContext extends VectorPartitionContext {
+
+    private VectorizedInputFileFormatPartitionContext(PartitionDesc partDesc) {
+      super(partDesc);
+    }
+
+    public void init(Configuration hconf) {
+    }
+
+    @Override
+    public int getReaderDataColumnCount() {
+      throw new RuntimeException("Not applicable");
+    }
+  }
+
+  /*
+   * Context for using VectorDeserializeRow to deserialize each row from the Input File Format
+   * into the VectorizedRowBatch.
+   */
+  protected class VectorDeserializePartitionContext extends VectorPartitionContext {
+
+    // This helper object deserializes known deserialization / input file format combination into
+    // columns of a row in a vectorized row batch.
+    private VectorDeserializeRow vectorDeserializeRow;
+
+    private DeserializeRead deserializeRead;
+
+    private int readerColumnCount;
+
+    private VectorDeserializePartitionContext(PartitionDesc partDesc) {
+      super(partDesc);
+    }
+
+    public VectorDeserializeRow getVectorDeserializeRow() {
+      return vectorDeserializeRow;
+    }
+
+    DeserializeRead getDeserializeRead() {
+      return deserializeRead;
+    }
+
+    @Override
+    public int getReaderDataColumnCount() {
+      return readerColumnCount;
+    }
+
+    public void init(Configuration hconf)
+        throws SerDeException, HiveException {
+      VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc();
+
+      // This type information specifies the data types the partition needs to read.
+      TypeInfo[] dataTypeInfos = vectorPartDesc.getDataTypeInfos();
+
+      switch (vectorPartDesc.getVectorDeserializeType()) {
+      case LAZY_SIMPLE:
+        {
+          LazySerDeParameters simpleSerdeParams =
+              new LazySerDeParameters(hconf, partDesc.getTableDesc().getProperties(),
+                  LazySimpleSerDe.class.getName());
+
+          LazySimpleDeserializeRead lazySimpleDeserializeRead =
+              new LazySimpleDeserializeRead(dataTypeInfos, simpleSerdeParams);
+
+          vectorDeserializeRow =
+              new VectorDeserializeRow<LazySimpleDeserializeRead>(lazySimpleDeserializeRead);
+
+          // Initialize with data row type conversion parameters.
+          readerColumnCount =
+              vectorDeserializeRow.initConversion(tableRowTypeInfos, columnsToIncludeTruncated);
+
+          deserializeRead = lazySimpleDeserializeRead;
+        }
+        break;
+
+      case LAZY_BINARY:
+        {
+          LazyBinaryDeserializeRead lazyBinaryDeserializeRead =
+              new LazyBinaryDeserializeRead(dataTypeInfos);
+
+          vectorDeserializeRow =
+              new VectorDeserializeRow<LazyBinaryDeserializeRead>(lazyBinaryDeserializeRead);
+
+          // Initialize with data row type conversion parameters.
+          readerColumnCount =
+              vectorDeserializeRow.initConversion(tableRowTypeInfos, columnsToIncludeTruncated);
+
+          deserializeRead = lazyBinaryDeserializeRead;
+        }
+        break;
+
+      default:
+        throw new RuntimeException(
+            "Unexpected vector deserialize row type " + vectorPartDesc.getVectorDeserializeType().name());
+      }
+    }
+  }
+
+  /*
+   * Context for reading using the regular partition deserializer to get the row object and
+   * assigning the row object into the VectorizedRowBatch with VectorAssignRow
+   */
+  protected class RowDeserializePartitionContext extends VectorPartitionContext {
+
+    private Deserializer partDeserializer;
+    private StructObjectInspector partRawRowObjectInspector;
+    private VectorAssignRow vectorAssign;
+
+    private int readerColumnCount;
+
+    private RowDeserializePartitionContext(PartitionDesc partDesc) {
+      super(partDesc);
+    }
+
+    public Deserializer getPartDeserializer() {
+      return partDeserializer;
+    }
+
+    public StructObjectInspector getPartRawRowObjectInspector() {
+      return partRawRowObjectInspector;
+    }
+
+    public VectorAssignRow getVectorAssign() {
+      return vectorAssign;
+    }
+
+    @Override
+    public int getReaderDataColumnCount() {
+      return readerColumnCount;
+    }
+
+    public void init(Configuration hconf)
+        throws Exception {
+      VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc();
+
+      partDeserializer = partDesc.getDeserializer(hconf);
+
+      if (partDeserializer instanceof OrcSerde) {
+
+        // UNDONE: We need to get the table schema inspector from self-describing Input File
+        //         Formats like ORC.  Modify the ORC serde instead?  For now, this works.
+
+        partRawRowObjectInspector =
+            (StructObjectInspector) OrcStruct.createObjectInspector(tableStructTypeInfo);
+
+      } else {
+        partRawRowObjectInspector =
+            (StructObjectInspector) partDeserializer.getObjectInspector();
+      }
+
+      TypeInfo[] dataTypeInfos = vectorPartDesc.getDataTypeInfos();
+
+      vectorAssign = new VectorAssignRow();
+
+      // Initialize with data type conversion parameters.
+      readerColumnCount =
+          vectorAssign.initConversion(dataTypeInfos, tableRowTypeInfos, columnsToIncludeTruncated);
+    }
+  }
+
+  public VectorPartitionContext createAndInitPartitionContext(PartitionDesc partDesc,
+      Configuration hconf)
+          throws SerDeException, Exception {
+
+    VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc();
+    VectorPartitionContext vectorPartitionContext;
+    VectorMapOperatorReadType vectorMapOperatorReadType =
+        vectorPartDesc.getVectorMapOperatorReadType();
+
+    if (vectorMapOperatorReadType == VectorMapOperatorReadType.VECTOR_DESERIALIZE ||
+        vectorMapOperatorReadType == VectorMapOperatorReadType.ROW_DESERIALIZE) {
+      // Verify hive.exec.schema.evolution is true or we have an ACID table so we are producing
+      // the table schema from ORC.  The Vectorizer class assures this.
+      boolean isAcid =
+          AcidUtils.isTablePropertyTransactional(partDesc.getTableDesc().getProperties());
+      Preconditions.checkState(Utilities.isSchemaEvolutionEnabled(hconf, isAcid));
+    }
+
+    switch (vectorMapOperatorReadType) {
+    case VECTORIZED_INPUT_FILE_FORMAT:
+      vectorPartitionContext = new VectorizedInputFileFormatPartitionContext(partDesc);
+      break;
+
+    case VECTOR_DESERIALIZE:
+      vectorPartitionContext = new VectorDeserializePartitionContext(partDesc);
+      break;
+
+    case ROW_DESERIALIZE:
+      vectorPartitionContext = new RowDeserializePartitionContext(partDesc);
+      break;
+
+    default:
+      throw new RuntimeException("Unexpected vector MapOperator read type " +
+          vectorMapOperatorReadType.name());
+    }
+
+    vectorPartitionContext.init(hconf);
+
+    return vectorPartitionContext;
+  }
+
+  private void determineColumnsToInclude(Configuration hconf) {
+
+    columnsToIncludeTruncated = null;
+
+    List<Integer> columnsToIncludeTruncatedList = ColumnProjectionUtils.getReadColumnIDs(hconf);
+    if (columnsToIncludeTruncatedList != null &&
+        columnsToIncludeTruncatedList.size() > 0 && columnsToIncludeTruncatedList.size() < dataColumnCount ) {
+
+      // Partitioned columns will not be in the include list.
+
+      boolean[] columnsToInclude = new boolean[dataColumnCount];
+      Arrays.fill(columnsToInclude, false);
+      for (int columnNum : columnsToIncludeTruncatedList) {
+        columnsToInclude[columnNum] = true;
+      }
+
+      // Work backwards to find the highest wanted column.
+
+      int highestWantedColumnNum = -1;
+      for (int i = dataColumnCount - 1; i >= 0; i--) {
+        if (columnsToInclude[i]) {
+          highestWantedColumnNum = i;
+          break;
+        }
+      }
+      if (highestWantedColumnNum == -1) {
+        throw new RuntimeException("No columns to include?");
+      }
+      int newColumnCount = highestWantedColumnNum + 1;
+      if (newColumnCount == dataColumnCount) {
+        columnsToIncludeTruncated = columnsToInclude;
+      } else {
+        columnsToIncludeTruncated = Arrays.copyOf(columnsToInclude, newColumnCount);
+      }
+    }
+  }
+
   /** Kryo ctor. */
-  @VisibleForTesting
   public VectorMapOperator() {
     super();
   }
@@ -40,29 +442,445 @@ public class VectorMapOperator extends MapOperator {
     super(ctx);
   }
 
+
+  /*
+   * This is the same as the setChildren method below but for empty tables.
+   */
+  @Override
+  public void initEmptyInputChildren(List<Operator<?>> children, Configuration hconf)
+    throws SerDeException, Exception {
+
+    // Get the single TableScanOperator.  Vectorization only supports one input tree.
+    Preconditions.checkState(children.size() == 1);
+    oneRootOperator = children.get(0);
+
+    internalSetChildren(hconf);
+  }
+
+  @Override
+  public void setChildren(Configuration hconf) throws Exception {
+
+    // Get the single TableScanOperator.  Vectorization only supports one input tree.
+    Iterator<Operator<? extends OperatorDesc>> aliasToWorkIterator =
+        conf.getAliasToWork().values().iterator();
+    oneRootOperator = aliasToWorkIterator.next();
+    Preconditions.checkState(!aliasToWorkIterator.hasNext());
+
+    internalSetChildren(hconf);
+  }
+
+  /*
+   * Create information for vector map operator.
+   * The member oneRootOperator has been set.
+   */
+  private void internalSetChildren(Configuration hconf) throws Exception {
+
+    // The setupPartitionContextVars uses the prior read type to flush the prior deserializerBatch,
+    // so set it here to none.
+    currentReadType = VectorMapOperatorReadType.NONE;
+
+    determineColumnsToInclude(hconf);
+
+    batchContext = conf.getVectorizedRowBatchCtx();
+
+    /*
+     * Use a different batch for vectorized Input File Format readers so they can do their work
+     * overlapped with work of the row collection that vector/row deserialization does.  This allows
+     * the partitions to mix modes (e.g. for us to flush the previously batched rows on file change).
+     */
+    vectorizedInputFileFormatBatch =
+        batchContext.createVectorizedRowBatch(columnsToIncludeTruncated);
+    conf.setVectorizedRowBatch(vectorizedInputFileFormatBatch);
+
+    /*
+     * This batch is used by vector/row deserializer readers.
+     */
+    deserializerBatch = batchContext.createVectorizedRowBatch(columnsToIncludeTruncated);
+
+    batchCounter = 0;
+
+    dataColumnCount = batchContext.getDataColumnCount();
+    partitionColumnCount = batchContext.getPartitionColumnCount();
+    partitionValues = new Object[partitionColumnCount];
+
+    /*
+     * Create table related objects
+     */
+    tableStructTypeInfo =
+        TypeInfoFactory.getStructTypeInfo(
+          Arrays.asList(batchContext.getRowColumnNames()),
+          Arrays.asList(batchContext.getRowColumnTypeInfos()));
+    tableStandardStructObjectInspector =
+        (StandardStructObjectInspector)
+            TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(tableStructTypeInfo);
+
+    tableRowTypeInfos = batchContext.getRowColumnTypeInfos();
+
+    /*
+     * The Vectorizer class enforces that there is only one TableScanOperator, so
+     * we don't need the more complicated multiple root operator mapping that MapOperator has.
+     */
+    fileToPartitionContextMap = new HashMap<String, VectorPartitionContext>();
+
+    // Temporary map so we only create one partition context entry.
+    HashMap<PartitionDesc, VectorPartitionContext> partitionContextMap =
+        new HashMap<PartitionDesc, VectorPartitionContext>();
+
+    for (Map.Entry<String, ArrayList<String>> entry : conf.getPathToAliases().entrySet()) {
+      String path = entry.getKey();
+      PartitionDesc partDesc = conf.getPathToPartitionInfo().get(path);
+      ArrayList<String> aliases = entry.getValue();
+
+      VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc();
+      LOG.info("VectorMapOperator path: " + path +
+          ", read type " + vectorPartDesc.getVectorMapOperatorReadType().name() +
+          ", vector deserialize type " + vectorPartDesc.getVectorDeserializeType().name() +
+          ", aliases " + aliases);
+
+      VectorPartitionContext vectorPartitionContext;
+      if (!partitionContextMap.containsKey(partDesc)) {
+        vectorPartitionContext = createAndInitPartitionContext(partDesc, hconf);
+        partitionContextMap.put(partDesc, vectorPartitionContext);
+      } else {
+        vectorPartitionContext = partitionContextMap.get(partDesc);
+      }
+
+      fileToPartitionContextMap.put(path, vectorPartitionContext);
+    }
+
+    // Create list of one.
+    List<Operator<? extends OperatorDesc>> children =
+        new ArrayList<Operator<? extends OperatorDesc>>();
+    children.add(oneRootOperator);
+
+    setChildOperators(children);
+  }
+
+  @Override
+  public void initializeMapOperator(Configuration hconf) throws HiveException {
+    super.initializeMapOperator(hconf);
+
+    oneRootOperator.initialize(hconf, new ObjectInspector[] {tableStandardStructObjectInspector});
+  }
+
+  public void initializeContexts() throws HiveException {
+    Path fpath = getExecContext().getCurrentInputPath();
+    String nominalPath = getNominalPath(fpath);
+    setupPartitionContextVars(nominalPath);
+  }
+
+  // Find context for current input file
+  @Override
+  public void cleanUpInputFileChangedOp() throws HiveException {
+    super.cleanUpInputFileChangedOp();
+    Path fpath = getExecContext().getCurrentInputPath();
+    String nominalPath = getNominalPath(fpath);
+
+    setupPartitionContextVars(nominalPath);
+
+    // Add alias, table name, and partitions to hadoop conf so that their
+    // children will inherit these
+    oneRootOperator.setInputContext(currentVectorPartContext.tableName,
+        currentVectorPartContext.partName);
+  }
+
+  /*
+   * Setup the context for reading from the next partition file.
+   */
+  private void setupPartitionContextVars(String nominalPath) throws HiveException {
+
+    currentVectorPartContext = fileToPartitionContextMap.get(nominalPath);
+    PartitionDesc partDesc = currentVectorPartContext.getPartDesc();
+    VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc();
+    currentReadType = vectorPartDesc.getVectorMapOperatorReadType();
+
+    /*
+     * Setup for 3 different kinds of vectorized reading supported:
+     *
+     *   1) Read the Vectorized Input File Format which returns VectorizedRowBatch as the row.
+     *
+     *   2) Read using VectorDeserializeRow to deserialize each row into the VectorizedRowBatch.
+     *
+     *   3) And read using the regular partition deserializer to get the row object and assigning
+     *      the row object into the VectorizedRowBatch with VectorAssignRow.
+     */
+    if (currentReadType == VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT) {
+
+      /*
+       * The Vectorized Input File Format reader is responsible for setting the partition column
+       * values, resetting and filling in the batch, etc.
+       */
+
+      /*
+       * Clear all the reading variables.
+       */
+      currentDataColumnCount = 0;
+
+      currentDeserializeRead = null;
+      currentVectorDeserializeRow = null;
+
+      currentPartDeserializer = null;
+      currentPartRawRowObjectInspector = null;
+      currentVectorAssign = null;
+
+    } else {
+
+      /*
+       * We will get "regular" single rows from the Input File Format reader that we will need
+       * to {vector|row} deserialize.
+       */
+      Preconditions.checkState(
+          currentReadType == VectorMapOperatorReadType.VECTOR_DESERIALIZE ||
+          currentReadType == VectorMapOperatorReadType.ROW_DESERIALIZE);
+
+      if (deserializerBatch.size > 0) {
+
+        /*
+         * Clear out any rows in the batch from previous partition since we are going to change
+         * the repeating partition column values.
+         */
+        batchCounter++;
+        oneRootOperator.process(deserializerBatch, 0);
+        deserializerBatch.reset();
+        if (oneRootOperator.getDone()) {
+          setDone(true);
+          return;
+        }
+
+      }
+
+      /*
+       * For this particular file, how many columns will we actually read?
+       */
+      currentDataColumnCount = currentVectorPartContext.getReaderDataColumnCount();
+
+      if (currentDataColumnCount < dataColumnCount) {
+
+        /*
+         * Default any additional data columns to NULL once for the file.
+         */
+        for (int i = currentDataColumnCount; i < dataColumnCount; i++) {
+          ColumnVector colVector = deserializerBatch.cols[i];
+          colVector.isNull[0] = true;
+          colVector.noNulls = false;
+          colVector.isRepeating = true;
+        }
+      }
+
+      if (batchContext.getPartitionColumnCount() > 0) {
+
+        /*
+         * The partition columns are set once for the partition and are marked repeating.
+         */
+        VectorizedRowBatchCtx.getPartitionValues(batchContext, partDesc, partitionValues);
+        batchContext.addPartitionColsToBatch(deserializerBatch, partitionValues);
+      }
+
+      /*
+       * Set or clear the rest of the reading variables based on {vector|row} deserialization.
+       */
+      switch (currentReadType) {
+      case VECTOR_DESERIALIZE:
+        {
+          VectorDeserializePartitionContext vectorDeserPartContext =
+              (VectorDeserializePartitionContext) currentVectorPartContext;
+
+          // Set ours.
+          currentDeserializeRead = vectorDeserPartContext.getDeserializeRead();
+          currentVectorDeserializeRow = vectorDeserPartContext.getVectorDeserializeRow();
+
+          // Clear the other ones.
+          currentPartDeserializer = null;
+          currentPartRawRowObjectInspector = null;
+          currentVectorAssign = null;
+
+        }
+        break;
+
+      case ROW_DESERIALIZE:
+        {
+          RowDeserializePartitionContext rowDeserPartContext =
+              (RowDeserializePartitionContext) currentVectorPartContext;
+
+          // Clear the other ones.
+          currentDeserializeRead = null;
+          currentVectorDeserializeRow = null;
+
+          // Set ours.
+          currentPartDeserializer = rowDeserPartContext.getPartDeserializer();
+          currentPartRawRowObjectInspector = rowDeserPartContext.getPartRawRowObjectInspector();
+          currentVectorAssign = rowDeserPartContext.getVectorAssign();
+        }
+        break;
+
+      default:
+        throw new RuntimeException("Unexpected VectorMapOperator read type " +
+            currentReadType.name());
+      }
+    }
+  }
+
+  @Override
+  public Deserializer getCurrentDeserializer() {
+    // Not applicable.
+    return null;
+  }
+
   @Override
   public void process(Writable value) throws HiveException {
+
     // A mapper can span multiple files/partitions.
-    // The serializers need to be reset if the input file changed
+    // The VectorPartitionContext need to be changed if the input file changed
     ExecMapperContext context = getExecContext();
     if (context != null && context.inputFileChanged()) {
       // The child operators cleanup if input file has changed
       cleanUpInputFileChanged();
     }
 
-    // The row has been converted to comply with table schema, irrespective of partition schema.
-    // So, use tblOI (and not partOI) for forwarding
-    try {
-      int childrenDone = 0;
-      for (MapOpCtx current : currentCtxs) {
-        if (!current.forward(value)) {
-          childrenDone++;
+    if (!oneRootOperator.getDone()) {
+
+      /*
+       * 3 different kinds of vectorized reading supported:
+       *
+       *   1) Read the Vectorized Input File Format which returns VectorizedRowBatch as the row.
+       *
+       *   2) Read using VectorDeserializeRow to deserialize each row into the VectorizedRowBatch.
+       *
+       *   3) And read using the regular partition deserializer to get the row object and assigning
+       *      the row object into the VectorizedRowBatch with VectorAssignRow.
+       */
+      try {
+        if (currentReadType == VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT) {
+
+          /*
+           * The Vectorized Input File Format reader has already set the partition column
+           * values, reset and filled in the batch, etc.
+           *
+           * We pass the VectorizedRowBatch through here.
+           */
+          batchCounter++;
+          oneRootOperator.process(value, 0);
+          if (oneRootOperator.getDone()) {
+            setDone(true);
+            return;
+          }
+
+        } else {
+
+          /*
+           * We have a "regular" single rows from the Input File Format reader that we will need
+           * to deserialize.
+           */
+          Preconditions.checkState(
+              currentReadType == VectorMapOperatorReadType.VECTOR_DESERIALIZE ||
+              currentReadType == VectorMapOperatorReadType.ROW_DESERIALIZE);
+
+          if (deserializerBatch.size == deserializerBatch.DEFAULT_SIZE) {
+
+            /*
+             * Feed current full batch to operator tree.
+             */
+            batchCounter++;
+            oneRootOperator.process(deserializerBatch, 0);
+
+            /**
+             * Only reset the current data columns.  Not any data columns defaulted to NULL
+             * because they are not present in the partition, and not partition columns.
+             */
+            for (int c = 0; c < currentDataColumnCount; c++) {
+              deserializerBatch.cols[c].reset();
+              deserializerBatch.cols[c].init();
+            }
+            deserializerBatch.selectedInUse = false;
+            deserializerBatch.size = 0;
+            deserializerBatch.endOfFile = false;
+
+            if (oneRootOperator.getDone()) {
+              setDone(true);
+              return;
+            }
+          }
+
+          /*
+           * Do the {vector|row} deserialization of the one row into the VectorizedRowBatch.
+           */
+          switch (currentReadType) {
+          case VECTOR_DESERIALIZE:
+            {
+              BinaryComparable binComp = (BinaryComparable) value;
+              currentDeserializeRead.set(binComp.getBytes(), 0, binComp.getLength());
+
+              // Deserialize and append new row using the current batch size as the index.
+              currentVectorDeserializeRow.deserialize(deserializerBatch, deserializerBatch.size++);
+            }
+            break;
+
+          case ROW_DESERIALIZE:
+            {
+              Object deserialized = currentPartDeserializer.deserialize(value);
+
+              // Note: Regardless of what the Input File Format returns, we have determined
+              // with VectorAppendRow.initConversion that only currentDataColumnCount columns
+              // have values we want.
+              //
+              // Any extra columns needed by the table schema were set to repeating null
+              // in the batch by setupPartitionContextVars.
+
+              // Convert input row to standard objects.
+              List<Object> standardObjects = new ArrayList<Object>();
+              ObjectInspectorUtils.copyToStandardObject(standardObjects, deserialized,
+                  currentPartRawRowObjectInspector, ObjectInspectorCopyOption.WRITABLE);
+              if (standardObjects.size() < currentDataColumnCount) {
+                throw new HiveException("Input File Format returned row with too few columns");
+              }
+
+              // Append the deserialized standard object row using the current batch size
+              // as the index.
+              currentVectorAssign.assignRow(deserializerBatch, deserializerBatch.size++,
+                  standardObjects, currentDataColumnCount);
+            }
+            break;
+
+          default:
+            throw new RuntimeException("Unexpected vector MapOperator read type " +
+                currentReadType.name());
+          }
         }
+      } catch (Exception e) {
+        throw new HiveException("Hive Runtime Error while processing row ", e);
       }
+    }
+  }
+
+  @Override
+  public void process(Object row, int tag) throws HiveException {
+    throw new HiveException("Hive 2 Internal error: should not be called!");
+  }
 
-      rowsForwarded(childrenDone, ((VectorizedRowBatch)value).size);
-    } catch (Exception e) {
-      throw new HiveException("Hive Runtime Error while processing row ", e);
+  @Override
+  public void closeOp(boolean abort) throws HiveException {
+    if (!abort && oneRootOperator != null && !oneRootOperator.getDone() &&
+        currentReadType != VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT) {
+      if (deserializerBatch.size > 0) {
+        batchCounter++;
+        oneRootOperator.process(deserializerBatch, 0);
+        deserializerBatch.size = 0;
+      }
     }
+    super.closeOp(abort);
+  }
+
+  @Override
+  public String getName() {
+    return getOperatorName();
+  }
+
+  static public String getOperatorName() {
+    return "MAP";
+  }
+
+  @Override
+  public OperatorType getType() {
+    return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
index 74e5130..dd5e20f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
@@ -40,7 +40,7 @@ public class VectorReduceSinkOperator extends ReduceSinkOperator {
 
   private transient boolean firstBatch;
 
-  private transient VectorExtractRowDynBatch vectorExtractRowDynBatch;
+  private transient VectorExtractRow vectorExtractRow;
 
   protected transient Object[] singleRow;
 
@@ -81,32 +81,28 @@ public class VectorReduceSinkOperator extends ReduceSinkOperator {
 
     VectorizedRowBatch batch = (VectorizedRowBatch) data;
     if (firstBatch) {
-      vectorExtractRowDynBatch = new VectorExtractRowDynBatch();
-      vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns());
+      vectorExtractRow = new VectorExtractRow();
+      vectorExtractRow.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns());
 
-      singleRow = new Object[vectorExtractRowDynBatch.getCount()];
+      singleRow = new Object[vectorExtractRow.getCount()];
 
       firstBatch = false;
     }
 
-    vectorExtractRowDynBatch.setBatchOnEntry(batch);
-
     // VectorizedBatchUtil.debugDisplayBatch( batch, "VectorReduceSinkOperator processOp ");
 
     if (batch.selectedInUse) {
       int selected[] = batch.selected;
       for (int logical = 0 ; logical < batch.size; logical++) {
         int batchIndex = selected[logical];
-        vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+        vectorExtractRow.extractRow(batch, batchIndex, singleRow);
         super.process(singleRow, tag);
       }
     } else {
       for (int batchIndex = 0 ; batchIndex < batch.size; batchIndex++) {
-        vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+        vectorExtractRow.extractRow(batch, batchIndex, singleRow);
         super.process(singleRow, tag);
       }
     }
-
-    vectorExtractRowDynBatch.forgetBatchOnExit();
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
index 85c8506..59153c8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
@@ -76,7 +76,7 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect
 
   private transient VectorHashKeyWrapperBatch keyWrapperBatch;
 
-  private transient Map<ObjectInspector, VectorAssignRowSameBatch> outputVectorAssignRowMap;
+  private transient Map<ObjectInspector, VectorAssignRow> outputVectorAssignRowMap;
 
   private transient int batchIndex = -1;
 
@@ -159,7 +159,7 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect
 
     keyWrapperBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions);
 
-    outputVectorAssignRowMap = new HashMap<ObjectInspector, VectorAssignRowSameBatch>();
+    outputVectorAssignRowMap = new HashMap<ObjectInspector, VectorAssignRow>();
 
     // This key evaluator translates from the vectorized VectorHashKeyWrapper format
     // into the row-mode MapJoinKey
@@ -287,15 +287,14 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect
   @Override
   protected void internalForward(Object row, ObjectInspector outputOI) throws HiveException {
     Object[] values = (Object[]) row;
-    VectorAssignRowSameBatch va = outputVectorAssignRowMap.get(outputOI);
+    VectorAssignRow va = outputVectorAssignRowMap.get(outputOI);
     if (va == null) {
-      va = new VectorAssignRowSameBatch();
+      va = new VectorAssignRow();
       va.init((StructObjectInspector) outputOI, vOutContext.getProjectedColumns());
-      va.setOneBatch(outputBatch);
       outputVectorAssignRowMap.put(outputOI, va);
     }
 
-    va.assignRow(outputBatch.size, values);
+    va.assignRow(outputBatch, outputBatch.size, values);
 
     ++outputBatch.size;
     if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) {

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java
index 3d0b571..51d1436 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java
@@ -46,7 +46,7 @@ public class VectorSparkHashTableSinkOperator extends SparkHashTableSinkOperator
 
   private transient boolean firstBatch;
 
-  private transient VectorExtractRowDynBatch vectorExtractRowDynBatch;
+  private transient VectorExtractRow vectorExtractRow;
 
   protected transient Object[] singleRow;
 
@@ -82,28 +82,26 @@ public class VectorSparkHashTableSinkOperator extends SparkHashTableSinkOperator
     VectorizedRowBatch batch = (VectorizedRowBatch) row;
 
     if (firstBatch) {
-      vectorExtractRowDynBatch = new VectorExtractRowDynBatch();
-      vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns());
+      vectorExtractRow = new VectorExtractRow();
+      vectorExtractRow.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns());
 
-      singleRow = new Object[vectorExtractRowDynBatch.getCount()];
+      singleRow = new Object[vectorExtractRow.getCount()];
 
       firstBatch = false;
     }
-    vectorExtractRowDynBatch.setBatchOnEntry(batch);
+
     if (batch.selectedInUse) {
       int selected[] = batch.selected;
       for (int logical = 0 ; logical < batch.size; logical++) {
         int batchIndex = selected[logical];
-        vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+        vectorExtractRow.extractRow(batch, batchIndex, singleRow);
         super.process(singleRow, tag);
       }
     } else {
       for (int batchIndex = 0 ; batchIndex < batch.size; batchIndex++) {
-        vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+        vectorExtractRow.extractRow(batch, batchIndex, singleRow);
         super.process(singleRow, tag);
       }
     }
-
-    vectorExtractRowDynBatch.forgetBatchOnExit();
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java
index e7ac531..2dc4d0e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java
@@ -42,7 +42,7 @@ public class VectorSparkPartitionPruningSinkOperator extends SparkPartitionPruni
 
   protected transient boolean firstBatch;
 
-  protected transient VectorExtractRowDynBatch vectorExtractRowDynBatch;
+  protected transient VectorExtractRow vectorExtractRow;
 
   protected transient Object[] singleRow;
 
@@ -77,27 +77,24 @@ public class VectorSparkPartitionPruningSinkOperator extends SparkPartitionPruni
   public void process(Object data, int tag) throws HiveException {
     VectorizedRowBatch batch = (VectorizedRowBatch) data;
     if (firstBatch) {
-      vectorExtractRowDynBatch = new VectorExtractRowDynBatch();
-      vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0],
+      vectorExtractRow = new VectorExtractRow();
+      vectorExtractRow.init((StructObjectInspector) inputObjInspectors[0],
           vContext.getProjectedColumns());
-      singleRow = new Object[vectorExtractRowDynBatch.getCount()];
+      singleRow = new Object[vectorExtractRow.getCount()];
       firstBatch = false;
     }
 
-    vectorExtractRowDynBatch.setBatchOnEntry(batch);
     ObjectInspector rowInspector = inputObjInspectors[0];
     try {
       Writable writableRow;
       for (int logical = 0; logical < batch.size; logical++) {
         int batchIndex = batch.selectedInUse ? batch.selected[logical] : logical;
-        vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+        vectorExtractRow.extractRow(batch, batchIndex, singleRow);
         writableRow = serializer.serialize(singleRow, rowInspector);
         writableRow.write(buffer);
       }
     } catch (Exception e) {
       throw new HiveException(e);
     }
-
-    vectorExtractRowDynBatch.forgetBatchOnExit();
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
index 86025ef..5c55011 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
@@ -68,13 +68,11 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUD
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxLong;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxString;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxTimestamp;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxIntervalDayTime;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinDouble;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinLong;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinString;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinTimestamp;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinIntervalDayTime;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdPopDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdPopDouble;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdPopLong;
@@ -171,10 +169,6 @@ public class VectorizationContext {
   public VectorizationContext(String contextName, List<String> initialColumnNames) {
     this.contextName = contextName;
     level = 0;
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("VectorizationContext consructor contextName " + contextName + " level "
-          + level + " initialColumnNames " + initialColumnNames);
-    }
     this.initialColumnNames = initialColumnNames;
     this.projectionColumnNames = initialColumnNames;
 
@@ -195,9 +189,6 @@ public class VectorizationContext {
   public VectorizationContext(String contextName) {
     this.contextName = contextName;
     level = 0;
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("VectorizationContext consructor contextName " + contextName + " level " + level);
-    }
     initialColumnNames = new ArrayList<String>();
     projectedColumns = new ArrayList<Integer>();
     projectionColumnNames = new ArrayList<String>();
@@ -213,7 +204,6 @@ public class VectorizationContext {
   public VectorizationContext(String contextName, VectorizationContext vContext) {
     this.contextName = contextName;
     level = vContext.level + 1;
-    LOG.info("VectorizationContext consructor reference contextName " + contextName + " level " + level);
     this.initialColumnNames = vContext.initialColumnNames;
     this.projectedColumns = new ArrayList<Integer>();
     this.projectionColumnNames = new ArrayList<String>();
@@ -485,7 +475,7 @@ public class VectorizationContext {
       throw new HiveException("Could not vectorize expression: "+exprDesc.getName());
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Input Expression = " + exprDesc.getTypeInfo()
+      LOG.debug("Input Expression = " + exprDesc.toString()
           + ", Vectorized Expression = " + ve.toString());
     }
     return ve;

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
index be04da8..9471e66 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec.vector;
 
 import java.io.IOException;
+import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -28,8 +29,11 @@ import java.util.Map;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
 import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
@@ -49,12 +53,14 @@ import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
@@ -63,6 +69,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -70,6 +77,7 @@ import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hive.common.util.DateUtils;
 
 public class VectorizedBatchUtil {
@@ -638,6 +646,47 @@ public class VectorizedBatchUtil {
     return newBatch;
   }
 
+  public static Writable getPrimitiveWritable(PrimitiveCategory primitiveCategory) {
+    switch (primitiveCategory) {
+    case VOID:
+      return null;
+    case BOOLEAN:
+      return new BooleanWritable(false);
+    case BYTE:
+      return new ByteWritable((byte) 0);
+    case SHORT:
+      return new ShortWritable((short) 0);
+    case INT:
+      return new IntWritable(0);
+    case LONG:
+      return new LongWritable(0);
+    case TIMESTAMP:
+      return new TimestampWritable(new Timestamp(0));
+    case DATE:
+      return new DateWritable(new Date(0));
+    case FLOAT:
+      return new FloatWritable(0);
+    case DOUBLE:
+      return new DoubleWritable(0);
+    case BINARY:
+      return new BytesWritable(ArrayUtils.EMPTY_BYTE_ARRAY);
+    case STRING:
+      return new Text(ArrayUtils.EMPTY_BYTE_ARRAY);
+    case VARCHAR:
+      return new HiveVarcharWritable(new HiveVarchar(StringUtils.EMPTY, -1));
+    case CHAR:
+      return new HiveCharWritable(new HiveChar(StringUtils.EMPTY, -1));
+    case DECIMAL:
+      return new HiveDecimalWritable();
+    case INTERVAL_YEAR_MONTH:
+      return new HiveIntervalYearMonthWritable();
+    case INTERVAL_DAY_TIME:
+      return new HiveIntervalDayTimeWritable();
+    default:
+      throw new RuntimeException("Primitive category " + primitiveCategory.name() + " not supported");
+    }
+  }
+
   public static String displayBytes(byte[] bytes, int start, int length) {
     StringBuilder sb = new StringBuilder();
     for (int i = start; i < start + length; i++) {

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
index 5cbace4..6a3d64b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
@@ -186,7 +186,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
         int length = byteSegmentRef.getLength();
         smallTableVectorDeserializeRow.setBytes(bytes, offset, length);
 
-        smallTableVectorDeserializeRow.deserializeByValue(batch, batchIndex);
+        smallTableVectorDeserializeRow.deserialize(batch, batchIndex);
       }
 
       // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, "generateHashMapResultSingleValue big table");
@@ -253,7 +253,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
           int length = byteSegmentRef.getLength();
           smallTableVectorDeserializeRow.setBytes(bytes, offset, length);
 
-          smallTableVectorDeserializeRow.deserializeByValue(overflowBatch, overflowBatch.size);
+          smallTableVectorDeserializeRow.deserialize(overflowBatch, overflowBatch.size);
         }
 
         // VectorizedBatchUtil.debugDisplayOneRow(overflowBatch, overflowBatch.size, "generateHashMapResultMultiValue overflow");
@@ -304,7 +304,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
           int length = byteSegmentRef.getLength();
           smallTableVectorDeserializeRow.setBytes(bytes, offset, length);
 
-          smallTableVectorDeserializeRow.deserializeByValue(overflowBatch, overflowBatch.DEFAULT_SIZE);
+          smallTableVectorDeserializeRow.deserialize(overflowBatch, overflowBatch.DEFAULT_SIZE);
         }
 
         overflowBatch.size++;
@@ -545,7 +545,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
 //      LOG.debug(CLASS_NAME + " reProcessBigTable serialized row #" + rowCount + ", offset " + offset + ", length " + length);
 
         bigTableVectorDeserializeRow.setBytes(bytes, offset, length);
-        bigTableVectorDeserializeRow.deserializeByValue(spillReplayBatch, spillReplayBatch.size);
+        bigTableVectorDeserializeRow.deserialize(spillReplayBatch, spillReplayBatch.size);
         spillReplayBatch.size++;
 
         if (spillReplayBatch.size == VectorizedRowBatch.DEFAULT_SIZE) {

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java
index 1877f14..d02d1db 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java
@@ -30,19 +30,19 @@ public class VectorMapJoinFastLongHashUtil {
     long key = 0;
     switch (hashTableKeyType) {
     case BOOLEAN:
-      key = (keyBinarySortableDeserializeRead.readBoolean() ? 1 : 0);
+      key = (keyBinarySortableDeserializeRead.currentBoolean ? 1 : 0);
       break;
     case BYTE:
-      key = (long) keyBinarySortableDeserializeRead.readByte();
+      key = (long) keyBinarySortableDeserializeRead.currentByte;
       break;
     case SHORT:
-      key = (long) keyBinarySortableDeserializeRead.readShort();
+      key = (long) keyBinarySortableDeserializeRead.currentShort;
       break;
     case INT:
-      key = (long) keyBinarySortableDeserializeRead.readInt();
+      key = (long) keyBinarySortableDeserializeRead.currentInt;
       break;
     case LONG:
-      key = keyBinarySortableDeserializeRead.readLong();
+      key = keyBinarySortableDeserializeRead.currentLong;
       break;
     default:
       throw new RuntimeException("Unexpected hash table key type " + hashTableKeyType.name());

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java
index adb8044..985fb1c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
-import org.apache.hadoop.hive.serde2.fast.DeserializeRead.ReadStringResults;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.io.BytesWritable;
@@ -40,8 +39,6 @@ public class VectorMapJoinFastStringCommon {
 
   private BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
 
-  private ReadStringResults readStringResults;
-
   public void adaptPutRow(VectorMapJoinFastBytesHashTable hashTable,
           BytesWritable currentKey, BytesWritable currentValue) throws HiveException, IOException {
 
@@ -51,9 +48,11 @@ public class VectorMapJoinFastStringCommon {
     if (keyBinarySortableDeserializeRead.readCheckNull()) {
       return;
     }
-    keyBinarySortableDeserializeRead.readString(readStringResults);
 
-    hashTable.add(readStringResults.bytes, readStringResults.start, readStringResults.length,
+    hashTable.add(
+        keyBinarySortableDeserializeRead.currentBytes,
+        keyBinarySortableDeserializeRead.currentBytesStart,
+        keyBinarySortableDeserializeRead.currentBytesLength,
         currentValue);
   }
 
@@ -61,6 +60,5 @@ public class VectorMapJoinFastStringCommon {
     this.isOuterJoin = isOuterJoin;
     PrimitiveTypeInfo[] primitiveTypeInfos = { TypeInfoFactory.stringTypeInfo };
     keyBinarySortableDeserializeRead = new BinarySortableDeserializeRead(primitiveTypeInfos);
-    readStringResults = keyBinarySortableDeserializeRead.createReadStringResults();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 46a5413..cfedf35 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -205,8 +205,8 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     if (!HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon())) {
       return inputFormat; // LLAP not enabled, no-op.
     }
-    boolean isSupported = inputFormat instanceof LlapWrappableInputFormatInterface,
-        isVectorized = Utilities.isVectorMode(conf);
+    boolean isSupported = inputFormat instanceof LlapWrappableInputFormatInterface;
+    boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(conf);
     if (!isSupported || !isVectorized) {
       LOG.info("Not using llap for " + inputFormat + ": supported = " + isSupported
           + ", vectorized = " + isVectorized);
@@ -225,7 +225,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
   }
 
   public static boolean canWrapAnyForLlap(Configuration conf, MapWork mapWork) {
-    return Utilities.isVectorMode(conf, mapWork);
+    return Utilities.getUseVectorizedInputFileFormat(conf, mapWork);
   }
 
   public static boolean canWrapForLlap(Class<? extends InputFormat> inputFormatClass) {

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java
index c53d149..80858a9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java
@@ -72,7 +72,7 @@ public class NullRowsInputFormat implements InputFormat<NullWritable, NullWritab
     private boolean addPartitionCols = true;
 
     public NullRowsRecordReader(Configuration conf, InputSplit split) throws IOException {
-      boolean isVectorMode = Utilities.isVectorMode(conf);
+      boolean isVectorMode = Utilities.getUseVectorizedInputFileFormat(conf);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Null record reader in " + (isVectorMode ? "" : "non-") + "vector mode");
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index fcb8ca4..33fe3b6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -452,7 +452,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
                                List<FileStatus> files
                               ) throws IOException {
 
-    if (Utilities.isVectorMode(conf)) {
+    if (Utilities.getUseVectorizedInputFileFormat(conf)) {
       return new VectorizedOrcInputFormat().validateInput(fs, conf, files);
     }
 
@@ -1640,7 +1640,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   public org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct>
   getRecordReader(InputSplit inputSplit, JobConf conf,
                   Reporter reporter) throws IOException {
-    boolean vectorMode = Utilities.isVectorMode(conf);
+    boolean vectorMode = Utilities.getUseVectorizedInputFileFormat(conf);
     boolean isAcidRead = isAcidRead(conf, inputSplit);
 
     if (!isAcidRead) {

http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
index a4e35cb..5b65e5c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
@@ -59,7 +59,7 @@ public class MapredParquetInputFormat extends FileInputFormat<NullWritable, Arra
       final org.apache.hadoop.mapred.Reporter reporter
       ) throws IOException {
     try {
-      if (Utilities.isVectorMode(job)) {
+      if (Utilities.getUseVectorizedInputFileFormat(job)) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Using vectorized record reader");
         }