You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/24 23:51:54 UTC

[20/54] [abbrv] hive git commit: HIVE-16207: Add support for Complex Types in Fast SerDe (Teddy Choi, reviewed by Matt McCline)

HIVE-16207: Add support for Complex Types in Fast SerDe (Teddy Choi, reviewed by Matt McCline)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d467e172
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d467e172
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d467e172

Branch: refs/heads/hive-14535
Commit: d467e172262c23b97e1d55e35798ba530cae5972
Parents: 189d454
Author: Teddy Choi <tc...@hortonworks.com>
Authored: Thu May 18 17:43:36 2017 -0500
Committer: Matt McCline <mm...@hortonworks.com>
Committed: Thu May 18 17:43:36 2017 -0500

----------------------------------------------------------------------
 .../hive/ql/exec/vector/VectorAssignRow.java    |  220 ++--
 .../ql/exec/vector/VectorDeserializeRow.java    |  912 ++++++++++-----
 .../hive/ql/exec/vector/VectorExtractRow.java   |  282 +++--
 .../hive/ql/exec/vector/VectorSerializeRow.java |  373 +++++--
 .../ql/exec/vector/TestVectorRowObject.java     |   13 +-
 .../hive/ql/exec/vector/TestVectorSerDeRow.java |  498 ++-------
 .../ql/exec/vector/VectorRandomRowSource.java   |  776 +++++++++----
 .../hive/ql/exec/vector/VectorVerifyFast.java   |  698 ++++++++++++
 .../mapjoin/fast/CheckFastRowHashMap.java       |   50 +-
 .../fast/TestVectorMapJoinFastRowHashMap.java   |  143 +--
 .../exec/vector/mapjoin/fast/VerifyFastRow.java |  874 ++++++++++-----
 .../fast/BinarySortableDeserializeRead.java     |  299 ++++-
 .../fast/BinarySortableSerializeWrite.java      |  295 +++--
 .../hive/serde2/fast/DeserializeRead.java       |  114 +-
 .../hadoop/hive/serde2/fast/SerializeWrite.java |   30 +
 .../hive/serde2/io/TimestampWritable.java       |    4 +-
 .../hadoop/hive/serde2/lazy/VerifyLazy.java     |  444 ++++++++
 .../lazy/fast/LazySimpleDeserializeRead.java    | 1034 ++++++++++++++----
 .../lazy/fast/LazySimpleSerializeWrite.java     |  320 +++---
 .../fast/LazyBinaryDeserializeRead.java         |  647 +++++++----
 .../fast/LazyBinarySerializeWrite.java          |  791 ++++++--------
 .../StandardUnionObjectInspector.java           |   25 +
 .../hive/serde2/SerdeRandomRowSource.java       |  627 +++++++++--
 .../apache/hadoop/hive/serde2/VerifyFast.java   |  877 ++++++++++-----
 .../hive/serde2/binarysortable/MyTestClass.java |   24 +-
 .../binarysortable/TestBinarySortableFast.java  |  121 +-
 .../hive/serde2/lazy/TestLazySimpleFast.java    |  171 +--
 .../serde2/lazybinary/TestLazyBinaryFast.java   |  113 +-
 28 files changed, 7424 insertions(+), 3351 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java
index 9c84937..b0d1c75 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java
@@ -21,7 +21,13 @@ package org.apache.hadoop.hive.ql.exec.vector;
 import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.type.HiveChar;
@@ -89,11 +95,8 @@ public class VectorAssignRow {
                 // Assigning can be a subset of columns, so this is the projection --
                 // the batch column numbers.
 
-  Category[] targetCategories;
-                // The data type category of each column being assigned.
-
-  PrimitiveCategory[] targetPrimitiveCategories;
-                // The data type primitive category of each column being assigned.
+  TypeInfo[] targetTypeInfos;
+                // The type info of each column being assigned.
 
   int[] maxLengths;
                 // For the CHAR and VARCHAR data types, the maximum character length of
@@ -117,8 +120,7 @@ public class VectorAssignRow {
   private void allocateArrays(int count) {
     isConvert = new boolean[count];
     projectionColumnNums = new int[count];
-    targetCategories = new Category[count];
-    targetPrimitiveCategories = new PrimitiveCategory[count];
+    targetTypeInfos = new TypeInfo[count];
     maxLengths = new int[count];
   }
 
@@ -136,12 +138,10 @@ public class VectorAssignRow {
   private void initTargetEntry(int logicalColumnIndex, int projectionColumnNum, TypeInfo typeInfo) {
     isConvert[logicalColumnIndex] = false;
     projectionColumnNums[logicalColumnIndex] = projectionColumnNum;
-    Category category = typeInfo.getCategory();
-    targetCategories[logicalColumnIndex] = category;
-    if (category == Category.PRIMITIVE) {
-      PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
-      PrimitiveCategory primitiveCategory = primitiveTypeInfo.getPrimitiveCategory();
-      targetPrimitiveCategories[logicalColumnIndex] = primitiveCategory;
+    targetTypeInfos[logicalColumnIndex] = typeInfo;
+    if (typeInfo.getCategory() == Category.PRIMITIVE) {
+      final PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
+      final PrimitiveCategory primitiveCategory = primitiveTypeInfo.getPrimitiveCategory();
       switch (primitiveCategory) {
       case CHAR:
         maxLengths[logicalColumnIndex] = ((CharTypeInfo) primitiveTypeInfo).getLength();
@@ -162,15 +162,16 @@ public class VectorAssignRow {
    */
   private void initConvertSourceEntry(int logicalColumnIndex, TypeInfo convertSourceTypeInfo) {
     isConvert[logicalColumnIndex] = true;
-    Category convertSourceCategory = convertSourceTypeInfo.getCategory();
+    final Category convertSourceCategory = convertSourceTypeInfo.getCategory();
     if (convertSourceCategory == Category.PRIMITIVE) {
-      PrimitiveTypeInfo convertSourcePrimitiveTypeInfo = (PrimitiveTypeInfo) convertSourceTypeInfo;
+      final PrimitiveTypeInfo convertSourcePrimitiveTypeInfo = (PrimitiveTypeInfo) convertSourceTypeInfo;
       convertSourcePrimitiveObjectInspectors[logicalColumnIndex] =
         PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
             convertSourcePrimitiveTypeInfo);
 
       // These need to be based on the target.
-      PrimitiveCategory targetPrimitiveCategory = targetPrimitiveCategories[logicalColumnIndex];
+      final PrimitiveCategory targetPrimitiveCategory =
+          ((PrimitiveTypeInfo) targetTypeInfos[logicalColumnIndex]).getPrimitiveCategory();
       switch (targetPrimitiveCategory) {
       case DATE:
         convertTargetWritables[logicalColumnIndex] = new DateWritable();
@@ -191,17 +192,17 @@ public class VectorAssignRow {
   public void init(StructObjectInspector structObjectInspector, List<Integer> projectedColumns)
       throws HiveException {
 
-    List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
+    final List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
     final int count = fields.size();
     allocateArrays(count);
 
     for (int i = 0; i < count; i++) {
 
-      int projectionColumnNum = projectedColumns.get(i);
+      final int projectionColumnNum = projectedColumns.get(i);
 
-      StructField field = fields.get(i);
-      ObjectInspector fieldInspector = field.getFieldObjectInspector();
-      TypeInfo typeInfo =
+      final StructField field = fields.get(i);
+      final ObjectInspector fieldInspector = field.getFieldObjectInspector();
+      final TypeInfo typeInfo =
           TypeInfoUtils.getTypeInfoFromTypeString(fieldInspector.getTypeName());
 
       initTargetEntry(i, projectionColumnNum, typeInfo);
@@ -214,15 +215,15 @@ public class VectorAssignRow {
    */
   public void init(StructObjectInspector structObjectInspector) throws HiveException {
 
-    List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
+    final List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
     final int count = fields.size();
     allocateArrays(count);
 
     for (int i = 0; i < count; i++) {
 
-      StructField field = fields.get(i);
-      ObjectInspector fieldInspector = field.getFieldObjectInspector();
-      TypeInfo typeInfo =
+      final StructField field = fields.get(i);
+      final ObjectInspector fieldInspector = field.getFieldObjectInspector();
+      final TypeInfo typeInfo =
           TypeInfoUtils.getTypeInfoFromTypeString(fieldInspector.getTypeName());
 
       initTargetEntry(i, i, typeInfo);
@@ -240,7 +241,7 @@ public class VectorAssignRow {
 
     for (int i = 0; i < count; i++) {
 
-      TypeInfo typeInfo =
+      final TypeInfo typeInfo =
           TypeInfoUtils.getTypeInfoFromTypeString(typeNames.get(i));
 
       initTargetEntry(i, i, typeInfo);
@@ -268,14 +269,14 @@ public class VectorAssignRow {
   public int initConversion(TypeInfo[] sourceTypeInfos, TypeInfo[] targetTypeInfos,
       boolean[] columnsToIncludeTruncated) {
 
-    int targetColumnCount;
+    final int targetColumnCount;
     if (columnsToIncludeTruncated == null) {
       targetColumnCount = targetTypeInfos.length;
     } else {
       targetColumnCount = Math.min(targetTypeInfos.length, columnsToIncludeTruncated.length);
     }
 
-    int sourceColumnCount = Math.min(sourceTypeInfos.length, targetColumnCount);
+    final int sourceColumnCount = Math.min(sourceTypeInfos.length, targetColumnCount);
 
     allocateArrays(sourceColumnCount);
     allocateConvertArrays(sourceColumnCount);
@@ -287,14 +288,14 @@ public class VectorAssignRow {
         // Field not included in query.
 
       } else {
-        TypeInfo targetTypeInfo = targetTypeInfos[i];
+        final TypeInfo targetTypeInfo = targetTypeInfos[i];
 
         if (targetTypeInfo.getCategory() != ObjectInspector.Category.PRIMITIVE) {
 
           // For now, we don't have an assigner for complex types...
 
         } else {
-          TypeInfo sourceTypeInfo = sourceTypeInfos[i];
+          final TypeInfo sourceTypeInfo = sourceTypeInfos[i];
 
           if (!sourceTypeInfo.equals(targetTypeInfo)) {
 
@@ -333,75 +334,83 @@ public class VectorAssignRow {
    * @param logicalColumnIndex
    * @param object    The row column object whose type is the target data type.
    */
-  public void assignRowColumn(VectorizedRowBatch batch, int batchIndex, int logicalColumnIndex,
-      Object object) {
-    Category targetCategory = targetCategories[logicalColumnIndex];
-    if (targetCategory == null) {
+  public void assignRowColumn(
+      VectorizedRowBatch batch, int batchIndex, int logicalColumnIndex, Object object) {
+
+    final int projectionColumnNum = projectionColumnNums[logicalColumnIndex];
+    final TypeInfo targetTypeInfo = targetTypeInfos[logicalColumnIndex];
+    if (targetTypeInfo == null || targetTypeInfo.getCategory() == null) {
       /*
        * This is a column that we don't want (i.e. not included) -- we are done.
        */
       return;
     }
-    final int projectionColumnNum = projectionColumnNums[logicalColumnIndex];
+    assignRowColumn(batch.cols[projectionColumnNum], batchIndex, targetTypeInfo, object);
+  }
+
+  private void assignRowColumn(
+      ColumnVector columnVector, int batchIndex, TypeInfo targetTypeInfo, Object object) {
+
     if (object == null) {
-      VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex);
+      VectorizedBatchUtil.setNullColIsNullValue(columnVector, batchIndex);
       return;
     }
-    switch (targetCategory) {
+    switch (targetTypeInfo.getCategory()) {
     case PRIMITIVE:
       {
-        PrimitiveCategory targetPrimitiveCategory = targetPrimitiveCategories[logicalColumnIndex];
+        final PrimitiveCategory targetPrimitiveCategory =
+            ((PrimitiveTypeInfo) targetTypeInfo).getPrimitiveCategory();
         switch (targetPrimitiveCategory) {
         case VOID:
-          VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex);
+          VectorizedBatchUtil.setNullColIsNullValue(columnVector, batchIndex);
           return;
         case BOOLEAN:
-          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
+          ((LongColumnVector) columnVector).vector[batchIndex] =
               (((BooleanWritable) object).get() ? 1 : 0);
           break;
         case BYTE:
-          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
+          ((LongColumnVector) columnVector).vector[batchIndex] =
              ((ByteWritable) object).get();
           break;
         case SHORT:
-          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
+          ((LongColumnVector) columnVector).vector[batchIndex] =
               ((ShortWritable) object).get();
           break;
         case INT:
-          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
+          ((LongColumnVector) columnVector).vector[batchIndex] =
               ((IntWritable) object).get();
           break;
         case LONG:
-          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
+          ((LongColumnVector) columnVector).vector[batchIndex] =
               ((LongWritable) object).get();
           break;
         case TIMESTAMP:
-          ((TimestampColumnVector) batch.cols[projectionColumnNum]).set(
+          ((TimestampColumnVector) columnVector).set(
               batchIndex, ((TimestampWritable) object).getTimestamp());
           break;
         case DATE:
-          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
+          ((LongColumnVector) columnVector).vector[batchIndex] =
              ((DateWritable) object).getDays();
           break;
         case FLOAT:
-          ((DoubleColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
+          ((DoubleColumnVector) columnVector).vector[batchIndex] =
               ((FloatWritable) object).get();
           break;
         case DOUBLE:
-          ((DoubleColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
+          ((DoubleColumnVector) columnVector).vector[batchIndex] =
               ((DoubleWritable) object).get();
           break;
         case BINARY:
           {
             BytesWritable bw = (BytesWritable) object;
-            ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal(
+            ((BytesColumnVector) columnVector).setVal(
                 batchIndex, bw.getBytes(), 0, bw.getLength());
           }
           break;
         case STRING:
           {
             Text tw = (Text) object;
-            ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal(
+            ((BytesColumnVector) columnVector).setVal(
                 batchIndex, tw.getBytes(), 0, tw.getLength());
           }
           break;
@@ -420,7 +429,7 @@ public class VectorAssignRow {
             // TODO: HIVE-13624 Do we need maxLength checking?
 
             byte[] bytes = hiveVarchar.getValue().getBytes();
-            ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal(
+            ((BytesColumnVector) columnVector).setVal(
                 batchIndex, bytes, 0, bytes.length);
           }
           break;
@@ -440,25 +449,25 @@ public class VectorAssignRow {
 
             // We store CHAR in vector row batch with padding stripped.
             byte[] bytes = hiveChar.getStrippedValue().getBytes();
-            ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal(
+            ((BytesColumnVector) columnVector).setVal(
                 batchIndex, bytes, 0, bytes.length);
           }
           break;
         case DECIMAL:
           if (object instanceof HiveDecimal) {
-            ((DecimalColumnVector) batch.cols[projectionColumnNum]).set(
+            ((DecimalColumnVector) columnVector).set(
                 batchIndex, (HiveDecimal) object);
           } else {
-            ((DecimalColumnVector) batch.cols[projectionColumnNum]).set(
+            ((DecimalColumnVector) columnVector).set(
                 batchIndex, (HiveDecimalWritable) object);
           }
           break;
         case INTERVAL_YEAR_MONTH:
-          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
+          ((LongColumnVector) columnVector).vector[batchIndex] =
               ((HiveIntervalYearMonthWritable) object).getHiveIntervalYearMonth().getTotalMonths();
           break;
         case INTERVAL_DAY_TIME:
-          ((IntervalDayTimeColumnVector) batch.cols[projectionColumnNum]).set(
+          ((IntervalDayTimeColumnVector) columnVector).set(
               batchIndex, ((HiveIntervalDayTimeWritable) object).getHiveIntervalDayTime());
           break;
         default:
@@ -467,14 +476,82 @@ public class VectorAssignRow {
         }
       }
       break;
+    case LIST:
+      {
+        final ListColumnVector listColumnVector = (ListColumnVector) columnVector;
+        final ListTypeInfo listTypeInfo = (ListTypeInfo) targetTypeInfo;
+        final TypeInfo elementTypeInfo = listTypeInfo.getListElementTypeInfo();
+        final List list = (List) object;
+        final int size = list.size();
+        final int childCount = listColumnVector.childCount;
+        listColumnVector.offsets[batchIndex] = childCount;
+        listColumnVector.lengths[batchIndex] = size;
+        listColumnVector.childCount = childCount + size;
+        listColumnVector.child.ensureSize(childCount + size, true);
+
+        for (int i = 0; i < size; i++) {
+          assignRowColumn(listColumnVector.child, childCount + i, elementTypeInfo, list.get(i));
+        }
+      }
+      break;
+    case MAP:
+      {
+        final MapColumnVector mapColumnVector = (MapColumnVector) columnVector;
+        final MapTypeInfo mapTypeInfo = (MapTypeInfo) targetTypeInfo;
+        final Map<Object, Object> map = (Map<Object, Object>) object;
+        final int size = map.size();
+        int childCount = mapColumnVector.childCount;
+        mapColumnVector.offsets[batchIndex] = childCount;
+        mapColumnVector.lengths[batchIndex] = size;
+        mapColumnVector.keys.ensureSize(childCount + size, true);
+        mapColumnVector.values.ensureSize(childCount + size, true);
+
+        for (Map.Entry<Object, Object> entry : map.entrySet()) {
+          assignRowColumn(mapColumnVector.keys, childCount, mapTypeInfo.getMapKeyTypeInfo(), entry.getKey());
+          assignRowColumn(mapColumnVector.values, childCount, mapTypeInfo.getMapValueTypeInfo(), entry.getValue());
+          childCount++;
+        }
+        mapColumnVector.childCount = childCount;
+      }
+      break;
+    case STRUCT:
+      {
+        final StructColumnVector structColumnVector = (StructColumnVector) columnVector;
+        final StructTypeInfo structTypeInfo = (StructTypeInfo) targetTypeInfo;
+        final List<TypeInfo> fieldStructTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+        final int size = fieldStructTypeInfos.size();
+        if (object instanceof List) {
+          final List struct = (List) object;
+          for (int i = 0; i < size; i++) {
+            assignRowColumn(structColumnVector.fields[i], batchIndex, fieldStructTypeInfos.get(i), struct.get(i));
+          }
+        } else {
+          final Object[] array = (Object[]) object;
+          for (int i = 0; i < size; i++) {
+            assignRowColumn(structColumnVector.fields[i], batchIndex, fieldStructTypeInfos.get(i), array[i]);
+          }
+        }
+      }
+      break;
+    case UNION:
+      {
+        final StandardUnion union = (StandardUnion) object;
+        final UnionColumnVector unionColumnVector = (UnionColumnVector) columnVector;
+        final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) targetTypeInfo;
+        final List<TypeInfo> objectTypeInfos = unionTypeInfo.getAllUnionObjectTypeInfos();
+        final byte tag = union.getTag();
+        unionColumnVector.tags[batchIndex] = tag;
+        assignRowColumn(unionColumnVector.fields[tag], batchIndex, objectTypeInfos.get(tag), union.getObject());
+      }
+      break;
     default:
-      throw new RuntimeException("Category " + targetCategory.name() + " not supported");
+      throw new RuntimeException("Category " + targetTypeInfo.getCategory().name() + " not supported");
     }
 
     /*
      * We always set the null flag to false when there is a value.
      */
-    batch.cols[projectionColumnNum].isNull[batchIndex] = false;
+    columnVector.isNull[batchIndex] = false;
   }
 
   /**
@@ -493,7 +570,7 @@ public class VectorAssignRow {
   public void assignConvertRowColumn(VectorizedRowBatch batch, int batchIndex,
       int logicalColumnIndex, Object object) {
     Preconditions.checkState(isConvert[logicalColumnIndex]);
-    Category targetCategory = targetCategories[logicalColumnIndex];
+    final Category targetCategory = targetTypeInfos[logicalColumnIndex].getCategory();
     if (targetCategory == null) {
       /*
        * This is a column that we don't want (i.e. not included) -- we are done.
@@ -508,7 +585,8 @@ public class VectorAssignRow {
     try {
       switch (targetCategory) {
       case PRIMITIVE:
-        PrimitiveCategory targetPrimitiveCategory = targetPrimitiveCategories[logicalColumnIndex];
+        final PrimitiveCategory targetPrimitiveCategory =
+            ((PrimitiveTypeInfo) targetTypeInfos[logicalColumnIndex]).getPrimitiveCategory();
         switch (targetPrimitiveCategory) {
         case VOID:
           VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex);
@@ -540,7 +618,7 @@ public class VectorAssignRow {
           break;
         case TIMESTAMP:
           {
-            Timestamp timestamp =
+            final Timestamp timestamp =
               PrimitiveObjectInspectorUtils.getTimestamp(
                   object, convertSourcePrimitiveObjectInspectors[logicalColumnIndex]);
             if (timestamp == null) {
@@ -553,13 +631,13 @@ public class VectorAssignRow {
           break;
         case DATE:
           {
-            Date date = PrimitiveObjectInspectorUtils.getDate(
+            final Date date = PrimitiveObjectInspectorUtils.getDate(
                 object, convertSourcePrimitiveObjectInspectors[logicalColumnIndex]);
             if (date == null) {
               VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex);
               return;
             }
-            DateWritable dateWritable = (DateWritable) convertTargetWritables[logicalColumnIndex];
+            final DateWritable dateWritable = (DateWritable) convertTargetWritables[logicalColumnIndex];
             dateWritable.set(date);
             ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
                 dateWritable.getDays();
@@ -577,7 +655,7 @@ public class VectorAssignRow {
           break;
         case BINARY:
           {
-            BytesWritable bytesWritable =
+            final BytesWritable bytesWritable =
                 PrimitiveObjectInspectorUtils.getBinary(
                     object, convertSourcePrimitiveObjectInspectors[logicalColumnIndex]);
             if (bytesWritable == null) {
@@ -590,7 +668,7 @@ public class VectorAssignRow {
           break;
         case STRING:
           {
-            String string = PrimitiveObjectInspectorUtils.getString(
+            final String string = PrimitiveObjectInspectorUtils.getString(
                 object, convertSourcePrimitiveObjectInspectors[logicalColumnIndex]);
             if (string == null) {
               VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex);
@@ -606,7 +684,7 @@ public class VectorAssignRow {
           {
             // UNDONE: Performance problem with conversion to String, then bytes...
 
-            HiveVarchar hiveVarchar =
+            final HiveVarchar hiveVarchar =
                 PrimitiveObjectInspectorUtils.getHiveVarchar(
                     object, convertSourcePrimitiveObjectInspectors[logicalColumnIndex]);
             if (hiveVarchar == null) {
@@ -625,7 +703,7 @@ public class VectorAssignRow {
           {
             // UNDONE: Performance problem with conversion to String, then bytes...
 
-            HiveChar hiveChar =
+            final HiveChar hiveChar =
                 PrimitiveObjectInspectorUtils.getHiveChar(
                     object, convertSourcePrimitiveObjectInspectors[logicalColumnIndex]);
             if (hiveChar == null) {
@@ -636,14 +714,14 @@ public class VectorAssignRow {
 
             // TODO: Do we need maxLength checking?
 
-            byte[] bytes = hiveChar.getStrippedValue().getBytes();
+            final byte[] bytes = hiveChar.getStrippedValue().getBytes();
             ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal(
                 batchIndex, bytes, 0, bytes.length);
           }
           break;
         case DECIMAL:
           {
-            HiveDecimal hiveDecimal =
+            final HiveDecimal hiveDecimal =
                 PrimitiveObjectInspectorUtils.getHiveDecimal(
                     object, convertSourcePrimitiveObjectInspectors[logicalColumnIndex]);
             if (hiveDecimal == null) {
@@ -656,7 +734,7 @@ public class VectorAssignRow {
           break;
         case INTERVAL_YEAR_MONTH:
           {
-            HiveIntervalYearMonth intervalYearMonth =
+            final HiveIntervalYearMonth intervalYearMonth =
                 PrimitiveObjectInspectorUtils.getHiveIntervalYearMonth(
                     object, convertSourcePrimitiveObjectInspectors[logicalColumnIndex]);
             if (intervalYearMonth == null) {
@@ -669,7 +747,7 @@ public class VectorAssignRow {
           break;
         case INTERVAL_DAY_TIME:
           {
-            HiveIntervalDayTime intervalDayTime =
+            final HiveIntervalDayTime intervalDayTime =
                 PrimitiveObjectInspectorUtils.getHiveIntervalDayTime(
                     object, convertSourcePrimitiveObjectInspectors[logicalColumnIndex]);
             if (intervalDayTime == null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
index fc82cf7..e37816f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
@@ -18,8 +18,8 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
-import java.io.EOFException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -42,8 +42,12 @@ import org.apache.hadoop.hive.serde2.io.TimestampWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;
@@ -91,6 +95,82 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
   private VectorDeserializeRow() {
   }
 
+  private static class Field {
+
+    private Category category;
+
+    private PrimitiveCategory primitiveCategory;
+                  //The data type primitive category of the column being deserialized.
+
+    private int maxLength;
+                  // For the CHAR and VARCHAR data types, the maximum character length of
+                  // the column.  Otherwise, 0.
+
+    private boolean isConvert;
+
+    /*
+     * This member has information for data type conversion.
+     * Not defined if there is no conversion.
+     */
+    Writable conversionWritable;
+                  // Conversion requires source be placed in writable so we can call upon
+                  // VectorAssignRow to convert and assign the row column.
+
+    private ComplexTypeHelper complexTypeHelper;
+                  // For a complex type, a helper object that describes elements, key/value pairs,
+                  // or fields.
+
+    public Field(PrimitiveCategory primitiveCategory, int maxLength) {
+      this.category = Category.PRIMITIVE;
+      this.primitiveCategory = primitiveCategory;
+      this.maxLength = maxLength;
+      this.isConvert = false;
+      this.conversionWritable = null;
+      this.complexTypeHelper = null;
+    }
+
+    public Field(Category category, ComplexTypeHelper complexTypeHelper) {
+      this.category = category;
+      this.primitiveCategory = null;
+      this.maxLength = 0;
+      this.isConvert = false;
+      this.conversionWritable = null;
+      this.complexTypeHelper = complexTypeHelper;
+    }
+
+    public Category getCategory() {
+      return category;
+    }
+
+    public PrimitiveCategory getPrimitiveCategory() {
+      return primitiveCategory;
+    }
+
+    public int getMaxLength() {
+      return maxLength;
+    }
+
+    public void setIsConvert(boolean isConvert) {
+      this.isConvert = isConvert;
+    }
+
+    public boolean getIsConvert() {
+      return isConvert;
+    }
+
+    public void setConversionWritable(Writable conversionWritable) {
+      this.conversionWritable = conversionWritable;
+    }
+
+    public Writable getConversionWritable() {
+      return conversionWritable;
+    }
+
+    public ComplexTypeHelper getComplexHelper() {
+      return complexTypeHelper;
+    }
+  }
+
   /*
    * These members have information for deserializing a row into the VectorizedRowBatch
    * columns.
@@ -105,30 +185,11 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
   private int[] readFieldLogicalIndices;
                 // The logical indices for reading with readField.
 
-  private boolean[] isConvert;
-                // For each column, are we converting the row column?
-
   private int[] projectionColumnNums;
                 // Assigning can be a subset of columns, so this is the projection --
                 // the batch column numbers.
 
-  private Category[] sourceCategories;
-                // The data type category of each column being deserialized.
-
-  private PrimitiveCategory[] sourcePrimitiveCategories;
-                //The data type primitive category of each column being deserialized.
-
-  private int[] maxLengths;
-                // For the CHAR and VARCHAR data types, the maximum character length of
-                // the columns.  Otherwise, 0.
-
-  /*
-   * These members have information for data type conversion.
-   * Not defined if there is no conversion.
-   */
-  Writable[] convertSourceWritables;
-                // Conversion requires source be placed in writable so we can call upon
-                // VectorAssignRow to convert and assign the row column.
+  private Field topLevelFields[];
 
   VectorAssignRow convertVectorAssignRow;
                 // Use its conversion ability.
@@ -137,62 +198,117 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
    * Allocate the source deserialization related arrays.
    */
   private void allocateArrays(int count) {
-    isConvert = new boolean[count];
     projectionColumnNums = new int[count];
     Arrays.fill(projectionColumnNums, -1);
-    sourceCategories = new Category[count];
-    sourcePrimitiveCategories = new PrimitiveCategory[count];
-    maxLengths = new int[count];
+    topLevelFields = new Field[count];
   }
 
-  /*
-   * Allocate the conversion related arrays (optional).
-   */
-  private void allocateConvertArrays(int count) {
-    convertSourceWritables = new Writable[count];
+  private Field allocatePrimitiveField(TypeInfo sourceTypeInfo) {
+    final PrimitiveTypeInfo sourcePrimitiveTypeInfo = (PrimitiveTypeInfo) sourceTypeInfo;
+    final PrimitiveCategory sourcePrimitiveCategory = sourcePrimitiveTypeInfo.getPrimitiveCategory();
+    final int maxLength;
+    switch (sourcePrimitiveCategory) {
+    case CHAR:
+      maxLength = ((CharTypeInfo) sourcePrimitiveTypeInfo).getLength();
+      break;
+    case VARCHAR:
+      maxLength = ((VarcharTypeInfo) sourcePrimitiveTypeInfo).getLength();
+      break;
+    default:
+      // No additional data type specific setting.
+      maxLength = 0;
+      break;
+    }
+    return new Field(sourcePrimitiveCategory, maxLength);
+  }
+
+  private Field allocateComplexField(TypeInfo sourceTypeInfo) {
+    final Category category = sourceTypeInfo.getCategory();
+    switch (category) {
+    case LIST:
+      {
+        final ListTypeInfo listTypeInfo = (ListTypeInfo) sourceTypeInfo;
+        final ListComplexTypeHelper listHelper =
+            new ListComplexTypeHelper(
+                allocateField(listTypeInfo.getListElementTypeInfo()));
+        return new Field(category, listHelper);
+      }
+    case MAP:
+      {
+        final MapTypeInfo mapTypeInfo = (MapTypeInfo) sourceTypeInfo;
+        final MapComplexTypeHelper mapHelper =
+            new MapComplexTypeHelper(
+                allocateField(mapTypeInfo.getMapKeyTypeInfo()),
+                allocateField(mapTypeInfo.getMapValueTypeInfo()));
+        return new Field(category, mapHelper);
+      }
+    case STRUCT:
+      {
+        final StructTypeInfo structTypeInfo = (StructTypeInfo) sourceTypeInfo;
+        final ArrayList<TypeInfo> fieldTypeInfoList = structTypeInfo.getAllStructFieldTypeInfos();
+        final int count = fieldTypeInfoList.size();
+        final Field[] fields = new Field[count];
+        for (int i = 0; i < count; i++) {
+          fields[i] = allocateField(fieldTypeInfoList.get(i));
+        }
+        final StructComplexTypeHelper structHelper =
+            new StructComplexTypeHelper(fields);
+        return new Field(category, structHelper);
+      }
+    case UNION:
+      {
+        final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) sourceTypeInfo;
+        final List<TypeInfo> fieldTypeInfoList = unionTypeInfo.getAllUnionObjectTypeInfos();
+        final int count = fieldTypeInfoList.size();
+        final Field[] fields = new Field[count];
+        for (int i = 0; i < count; i++) {
+          fields[i] = allocateField(fieldTypeInfoList.get(i));
+        }
+        final UnionComplexTypeHelper unionHelper =
+            new UnionComplexTypeHelper(fields);
+        return new Field(category, unionHelper);
+      }
+    default:
+      throw new RuntimeException("Category " + category + " not supported");
+    }
+  }
+
+  private Field allocateField(TypeInfo sourceTypeInfo) {
+    switch (sourceTypeInfo.getCategory()) {
+    case PRIMITIVE:
+      return allocatePrimitiveField(sourceTypeInfo);
+    case LIST:
+    case MAP:
+    case STRUCT:
+    case UNION:
+      return allocateComplexField(sourceTypeInfo);
+    default:
+      throw new RuntimeException("Category " + sourceTypeInfo.getCategory() + " not supported");
+    }
   }
 
   /*
-   * Initialize one column's source deserializtion related arrays.
+   * Initialize one column's source deserializtion information.
    */
-  private void initSourceEntry(int logicalColumnIndex, int projectionColumnNum, TypeInfo sourceTypeInfo) {
-    isConvert[logicalColumnIndex] = false;
+  private void initTopLevelField(int logicalColumnIndex, int projectionColumnNum, TypeInfo sourceTypeInfo) {
+
     projectionColumnNums[logicalColumnIndex] = projectionColumnNum;
-    Category sourceCategory = sourceTypeInfo.getCategory();
-    sourceCategories[logicalColumnIndex] = sourceCategory;
-    if (sourceCategory == Category.PRIMITIVE) {
-      PrimitiveTypeInfo sourcePrimitiveTypeInfo = (PrimitiveTypeInfo) sourceTypeInfo;
-      PrimitiveCategory sourcePrimitiveCategory = sourcePrimitiveTypeInfo.getPrimitiveCategory();
-      sourcePrimitiveCategories[logicalColumnIndex] = sourcePrimitiveCategory;
-      switch (sourcePrimitiveCategory) {
-      case CHAR:
-        maxLengths[logicalColumnIndex] = ((CharTypeInfo) sourcePrimitiveTypeInfo).getLength();
-        break;
-      case VARCHAR:
-        maxLengths[logicalColumnIndex] = ((VarcharTypeInfo) sourcePrimitiveTypeInfo).getLength();
-        break;
-      default:
-        // No additional data type specific setting.
-        break;
-      }
-    } else {
-      // We don't currently support complex types.
-      Preconditions.checkState(false);
-    }
+
+    topLevelFields[logicalColumnIndex] = allocateField(sourceTypeInfo);
   }
 
   /*
-   * Initialize the conversion related arrays.  Assumes initSourceEntry has already been called.
+   * Initialize the conversion related arrays.  Assumes initTopLevelField has already been called.
    */
-  private void initConvertTargetEntry(int logicalColumnIndex) {
-    isConvert[logicalColumnIndex] = true;
+  private void addTopLevelConversion(int logicalColumnIndex) {
 
-    if (sourceCategories[logicalColumnIndex] == Category.PRIMITIVE) {
-      convertSourceWritables[logicalColumnIndex] =
-          VectorizedBatchUtil.getPrimitiveWritable(sourcePrimitiveCategories[logicalColumnIndex]);
-    } else {
-      // We don't currently support complex types.
-      Preconditions.checkState(false);
+    final Field field = topLevelFields[logicalColumnIndex];
+    field.setIsConvert(true);
+
+    if (field.getCategory() == Category.PRIMITIVE) {
+
+      field.setConversionWritable(
+          VectorizedBatchUtil.getPrimitiveWritable(field.getPrimitiveCategory()));
     }
   }
 
@@ -206,7 +322,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
 
     for (int i = 0; i < count; i++) {
       int outputColumn = outputColumns[i];
-      initSourceEntry(i, outputColumn, sourceTypeInfos[i]);
+      initTopLevelField(i, outputColumn, sourceTypeInfos[i]);
     }
   }
 
@@ -220,7 +336,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
 
     for (int i = 0; i < count; i++) {
       int outputColumn = outputColumns.get(i);
-      initSourceEntry(i, outputColumn, sourceTypeInfos[i]);
+      initTopLevelField(i, outputColumn, sourceTypeInfos[i]);
     }
   }
 
@@ -234,7 +350,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
 
     for (int i = 0; i < count; i++) {
       int outputColumn = startColumn + i;
-      initSourceEntry(i, outputColumn, sourceTypeInfos[i]);
+      initTopLevelField(i, outputColumn, sourceTypeInfos[i]);
     }
   }
 
@@ -250,7 +366,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
     allocateArrays(columnCount);
 
     int includedCount = 0;
-    int[] includedIndices = new int[columnCount];
+    final int[] includedIndices = new int[columnCount];
 
     for (int i = 0; i < columnCount; i++) {
 
@@ -260,7 +376,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
 
       } else {
 
-        initSourceEntry(i, i, sourceTypeInfos[i]);
+        initTopLevelField(i, i, sourceTypeInfos[i]);
         includedIndices[includedCount++] = i;
       }
     }
@@ -298,7 +414,6 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
 
     final int columnCount = sourceTypeInfos.length;
     allocateArrays(columnCount);
-    allocateConvertArrays(columnCount);
 
     int includedCount = 0;
     int[] includedIndices = new int[columnCount];
@@ -320,20 +435,22 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
           if (VectorPartitionConversion.isImplicitVectorColumnConversion(sourceTypeInfo, targetTypeInfo)) {
 
             // Do implicit conversion from source type to target type.
-            initSourceEntry(i, i, sourceTypeInfo);
+            initTopLevelField(i, i, sourceTypeInfo);
 
           } else {
 
             // Do formal conversion...
-            initSourceEntry(i, i, sourceTypeInfo);
-            initConvertTargetEntry(i);
+            initTopLevelField(i, i, sourceTypeInfo);
+
+            // UNDONE: No for List and Map; Yes for Struct and Union when field count different...
+            addTopLevelConversion(i);
             atLeastOneConvert = true;
 
           }
         } else {
 
           // No conversion.
-          initSourceEntry(i, i, sourceTypeInfo);
+          initTopLevelField(i, i, sourceTypeInfo);
 
         }
 
@@ -360,6 +477,379 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
     init(0);
   }
 
+  private void storePrimitiveRowColumn(ColumnVector colVector, Field field,
+      int batchIndex, boolean canRetainByteRef) throws IOException {
+
+    switch (field.getPrimitiveCategory()) {
+    case VOID:
+      VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
+      return;
+    case BOOLEAN:
+      ((LongColumnVector) colVector).vector[batchIndex] = (deserializeRead.currentBoolean ? 1 : 0);
+      break;
+    case BYTE:
+      ((LongColumnVector) colVector).vector[batchIndex] = deserializeRead.currentByte;
+      break;
+    case SHORT:
+      ((LongColumnVector) colVector).vector[batchIndex] = deserializeRead.currentShort;
+      break;
+    case INT:
+      ((LongColumnVector) colVector).vector[batchIndex] = deserializeRead.currentInt;
+      break;
+    case LONG:
+      ((LongColumnVector) colVector).vector[batchIndex] = deserializeRead.currentLong;
+      break;
+    case TIMESTAMP:
+      ((TimestampColumnVector) colVector).set(
+          batchIndex, deserializeRead.currentTimestampWritable.getTimestamp());
+      break;
+    case DATE:
+      ((LongColumnVector) colVector).vector[batchIndex] = deserializeRead.currentDateWritable.getDays();
+      break;
+    case FLOAT:
+      ((DoubleColumnVector) colVector).vector[batchIndex] = deserializeRead.currentFloat;
+      break;
+    case DOUBLE:
+      ((DoubleColumnVector) colVector).vector[batchIndex] = deserializeRead.currentDouble;
+      break;
+    case BINARY:
+    case STRING:
+      {
+        final BytesColumnVector bytesColVec = ((BytesColumnVector) colVector);
+        if (deserializeRead.currentExternalBufferNeeded) {
+          bytesColVec.ensureValPreallocated(deserializeRead.currentExternalBufferNeededLen);
+          deserializeRead.copyToExternalBuffer(
+              bytesColVec.getValPreallocatedBytes(), bytesColVec.getValPreallocatedStart());
+          bytesColVec.setValPreallocated(
+              batchIndex,
+              deserializeRead.currentExternalBufferNeededLen);
+        } else if (canRetainByteRef && inputBytes == deserializeRead.currentBytes) {
+          bytesColVec.setRef(
+              batchIndex,
+              deserializeRead.currentBytes,
+              deserializeRead.currentBytesStart,
+              deserializeRead.currentBytesLength);
+        } else {
+          bytesColVec.setVal(
+              batchIndex,
+              deserializeRead.currentBytes,
+              deserializeRead.currentBytesStart,
+              deserializeRead.currentBytesLength);
+        }
+      }
+      break;
+    case VARCHAR:
+      {
+        // Use the basic STRING bytes read to get access, then use our optimal truncate/trim method
+        // that does not use Java String objects.
+        final BytesColumnVector bytesColVec = ((BytesColumnVector) colVector);
+        if (deserializeRead.currentExternalBufferNeeded) {
+          // Write directly into our BytesColumnVector value buffer.
+          bytesColVec.ensureValPreallocated(deserializeRead.currentExternalBufferNeededLen);
+          final byte[] convertBuffer = bytesColVec.getValPreallocatedBytes();
+          final int convertBufferStart = bytesColVec.getValPreallocatedStart();
+          deserializeRead.copyToExternalBuffer(
+              convertBuffer,
+              convertBufferStart);
+          bytesColVec.setValPreallocated(
+              batchIndex,
+              StringExpr.truncate(
+                  convertBuffer,
+                  convertBufferStart,
+                  deserializeRead.currentExternalBufferNeededLen,
+                  field.getMaxLength()));
+        } else if (canRetainByteRef && inputBytes == deserializeRead.currentBytes) {
+          bytesColVec.setRef(
+              batchIndex,
+              deserializeRead.currentBytes,
+              deserializeRead.currentBytesStart,
+              StringExpr.truncate(
+                  deserializeRead.currentBytes,
+                  deserializeRead.currentBytesStart,
+                  deserializeRead.currentBytesLength,
+                  field.getMaxLength()));
+        } else {
+          bytesColVec.setVal(
+              batchIndex,
+              deserializeRead.currentBytes,
+              deserializeRead.currentBytesStart,
+              StringExpr.truncate(
+                  deserializeRead.currentBytes,
+                  deserializeRead.currentBytesStart,
+                  deserializeRead.currentBytesLength,
+                  field.getMaxLength()));
+        }
+      }
+      break;
+    case CHAR:
+      {
+        // Use the basic STRING bytes read to get access, then use our optimal truncate/trim method
+        // that does not use Java String objects.
+        final BytesColumnVector bytesColVec = ((BytesColumnVector) colVector);
+        if (deserializeRead.currentExternalBufferNeeded) {
+          // Write directly into our BytesColumnVector value buffer.
+          bytesColVec.ensureValPreallocated(deserializeRead.currentExternalBufferNeededLen);
+          final byte[] convertBuffer = bytesColVec.getValPreallocatedBytes();
+          final int convertBufferStart = bytesColVec.getValPreallocatedStart();
+          deserializeRead.copyToExternalBuffer(
+              convertBuffer,
+              convertBufferStart);
+          bytesColVec.setValPreallocated(
+              batchIndex,
+              StringExpr.rightTrimAndTruncate(
+                  convertBuffer,
+                  convertBufferStart,
+                  deserializeRead.currentExternalBufferNeededLen,
+                  field.getMaxLength()));
+        } else if (canRetainByteRef && inputBytes == deserializeRead.currentBytes) {
+          bytesColVec.setRef(
+              batchIndex,
+              deserializeRead.currentBytes,
+              deserializeRead.currentBytesStart,
+              StringExpr.rightTrimAndTruncate(
+                  deserializeRead.currentBytes,
+                  deserializeRead.currentBytesStart,
+                  deserializeRead.currentBytesLength,
+                  field.getMaxLength()));
+        } else {
+          bytesColVec.setVal(
+              batchIndex,
+              deserializeRead.currentBytes,
+              deserializeRead.currentBytesStart,
+              StringExpr.rightTrimAndTruncate(
+                  deserializeRead.currentBytes,
+                  deserializeRead.currentBytesStart,
+                  deserializeRead.currentBytesLength,
+                  field.getMaxLength()));
+        }
+      }
+      break;
+    case DECIMAL:
+      // The DecimalColumnVector set method will quickly copy the deserialized decimal writable fields.
+      ((DecimalColumnVector) colVector).set(
+          batchIndex, deserializeRead.currentHiveDecimalWritable);
+      break;
+    case INTERVAL_YEAR_MONTH:
+      ((LongColumnVector) colVector).vector[batchIndex] =
+          deserializeRead.currentHiveIntervalYearMonthWritable.getHiveIntervalYearMonth().getTotalMonths();
+      break;
+    case INTERVAL_DAY_TIME:
+      ((IntervalDayTimeColumnVector) colVector).set(
+          batchIndex, deserializeRead.currentHiveIntervalDayTimeWritable.getHiveIntervalDayTime());
+      break;
+    default:
+      throw new RuntimeException("Primitive category " + field.getPrimitiveCategory() +
+          " not supported");
+    }
+  }
+
+  private static class ComplexTypeHelper {
+  }
+
+  private static class ListComplexTypeHelper extends ComplexTypeHelper {
+
+    private Field elementField;
+
+    public ListComplexTypeHelper(Field elementField) {
+      this.elementField = elementField;
+    }
+
+    public Field getElementField() {
+      return elementField;
+    }
+  }
+
+  private static class MapComplexTypeHelper extends ComplexTypeHelper {
+
+    private Field keyField;
+    private Field valueField;
+
+    public MapComplexTypeHelper(Field keyField, Field valueField) {
+      this.keyField = keyField;
+      this.valueField = valueField;
+    }
+
+    public Field getKeyField() {
+      return keyField;
+    }
+
+    public Field getValueField() {
+      return valueField;
+    }
+  }
+
+  private static class FieldsComplexTypeHelper extends ComplexTypeHelper {
+
+    private Field[] fields;
+
+    public FieldsComplexTypeHelper(Field[] fields) {
+      this.fields = fields;
+    }
+
+    public Field[] getFields() {
+      return fields;
+    }
+  }
+
+  private static class StructComplexTypeHelper extends FieldsComplexTypeHelper {
+
+    public StructComplexTypeHelper(Field[] fields) {
+      super(fields);
+    }
+  }
+
+  private static class UnionComplexTypeHelper extends FieldsComplexTypeHelper {
+
+    public UnionComplexTypeHelper(Field[] fields) {
+      super(fields);
+    }
+  }
+
+  // UNDONE: Presumption of *append*
+
+  private void storeComplexFieldRowColumn(ColumnVector fieldColVector,
+      Field field, int batchIndex, boolean canRetainByteRef) throws IOException {
+
+    if (!deserializeRead.readComplexField()) {
+      fieldColVector.isNull[batchIndex] = true;
+      fieldColVector.noNulls = false;
+      return;
+    }
+
+    switch (field.getCategory()) {
+    case PRIMITIVE:
+      storePrimitiveRowColumn(fieldColVector, field, batchIndex, canRetainByteRef);
+      break;
+    case LIST:
+      storeListRowColumn(fieldColVector, field, batchIndex, canRetainByteRef);
+      break;
+    case MAP:
+      storeMapRowColumn(fieldColVector, field, batchIndex, canRetainByteRef);
+      break;
+    case STRUCT:
+      storeStructRowColumn(fieldColVector, field, batchIndex, canRetainByteRef);
+      break;
+    case UNION:
+      storeUnionRowColumn(fieldColVector, field, batchIndex, canRetainByteRef);
+      break;
+    default:
+      throw new RuntimeException("Category " + field.getCategory() + " not supported");
+    }
+  }
+
+  private void storeListRowColumn(ColumnVector colVector,
+      Field field, int batchIndex, boolean canRetainByteRef) throws IOException {
+
+    final ListColumnVector listColVector = (ListColumnVector) colVector;
+    final ColumnVector elementColVector = listColVector.child;
+    int offset = listColVector.childCount;
+    listColVector.isNull[batchIndex] = false;
+    listColVector.offsets[batchIndex] = offset;
+
+    final ListComplexTypeHelper listHelper = (ListComplexTypeHelper) field.getComplexHelper();
+
+    int listLength = 0;
+    while (deserializeRead.isNextComplexMultiValue()) {
+
+      // Ensure child size.
+      final int childCapacity = listColVector.child.isNull.length;
+      final int childCount = listColVector.childCount;
+      if (childCapacity < childCount / 0.75) {
+        listColVector.child.ensureSize(childCapacity * 2, true);
+      }
+
+      storeComplexFieldRowColumn(
+          elementColVector, listHelper.getElementField(), offset, canRetainByteRef);
+      offset++;
+      listLength++;
+    }
+
+    listColVector.childCount += listLength;
+    listColVector.lengths[batchIndex] = listLength;
+  }
+
+  private void storeMapRowColumn(ColumnVector colVector,
+      Field field, int batchIndex, boolean canRetainByteRef) throws IOException {
+
+    final MapColumnVector mapColVector = (MapColumnVector) colVector;
+    final MapComplexTypeHelper mapHelper = (MapComplexTypeHelper) field.getComplexHelper();
+    final ColumnVector keysColVector = mapColVector.keys;
+    final ColumnVector valuesColVector = mapColVector.values;
+    int offset = mapColVector.childCount;
+    mapColVector.offsets[batchIndex] = offset;
+    mapColVector.isNull[batchIndex] = false;
+
+    int keyValueCount = 0;
+    while (deserializeRead.isNextComplexMultiValue()) {
+
+      // Ensure child size.
+      final int childCapacity = mapColVector.keys.isNull.length;
+      final int childCount = mapColVector.childCount;
+      if (childCapacity < childCount / 0.75) {
+        mapColVector.keys.ensureSize(childCapacity * 2, true);
+        mapColVector.values.ensureSize(childCapacity * 2, true);
+      }
+
+      // Key.
+      storeComplexFieldRowColumn(
+          keysColVector, mapHelper.getKeyField(), offset, canRetainByteRef);
+
+      // Value.
+      storeComplexFieldRowColumn(
+          valuesColVector, mapHelper.getValueField(), offset, canRetainByteRef);
+
+      offset++;
+      keyValueCount++;
+    }
+
+    mapColVector.childCount += keyValueCount;
+    mapColVector.lengths[batchIndex] = keyValueCount;
+  }
+
+  private void storeStructRowColumn(ColumnVector colVector,
+      Field field, int batchIndex, boolean canRetainByteRef) throws IOException {
+
+    final StructColumnVector structColVector = (StructColumnVector) colVector;
+    final ColumnVector[] colVectorFields = structColVector.fields;
+    final StructComplexTypeHelper structHelper = (StructComplexTypeHelper) field.getComplexHelper();
+    final Field[] fields = structHelper.getFields();
+    structColVector.isNull[batchIndex] = false;
+
+    int i = 0;
+    for (ColumnVector colVectorField : colVectorFields) {
+      storeComplexFieldRowColumn(
+          colVectorField,
+          fields[i],
+          batchIndex,
+          canRetainByteRef);
+      i++;
+    }
+    deserializeRead.finishComplexVariableFieldsType();
+  }
+
+  private void storeUnionRowColumn(ColumnVector colVector,
+      Field field, int batchIndex, boolean canRetainByteRef) throws IOException {
+
+    deserializeRead.readComplexField();
+
+    // The read field of the Union gives us its tag.
+    final int tag = deserializeRead.currentInt;
+
+    final UnionColumnVector unionColVector = (UnionColumnVector) colVector;
+    final ColumnVector[] colVectorFields = unionColVector.fields;
+    final UnionComplexTypeHelper unionHelper = (UnionComplexTypeHelper) field.getComplexHelper();
+
+    unionColVector.isNull[batchIndex] = false;
+    unionColVector.tags[batchIndex] = tag;
+
+    storeComplexFieldRowColumn(
+        colVectorFields[tag],
+        unionHelper.getFields()[tag],
+        batchIndex,
+        canRetainByteRef);
+    deserializeRead.finishComplexVariableFieldsType();
+  }
+
   /**
    * Store one row column value that is the current value in deserializeRead.
    *
@@ -374,186 +864,29 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
    * @throws IOException
    */
   private void storeRowColumn(VectorizedRowBatch batch, int batchIndex,
-      int logicalColumnIndex, boolean canRetainByteRef) throws IOException {
+      Field field, int logicalColumnIndex, boolean canRetainByteRef) throws IOException {
 
     final int projectionColumnNum = projectionColumnNums[logicalColumnIndex];
-    switch (sourceCategories[logicalColumnIndex]) {
+    ColumnVector colVector = batch.cols[projectionColumnNum];
+
+    switch (field.getCategory()) {
     case PRIMITIVE:
-      {
-        PrimitiveCategory sourcePrimitiveCategory = sourcePrimitiveCategories[logicalColumnIndex];
-        switch (sourcePrimitiveCategory) {
-        case VOID:
-          VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex);
-          return;
-        case BOOLEAN:
-          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
-              (deserializeRead.currentBoolean ? 1 : 0);
-          break;
-        case BYTE:
-          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
-              deserializeRead.currentByte;
-          break;
-        case SHORT:
-          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
-              deserializeRead.currentShort;
-          break;
-        case INT:
-          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
-              deserializeRead.currentInt;
-          break;
-        case LONG:
-          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
-              deserializeRead.currentLong;
-          break;
-        case TIMESTAMP:
-          ((TimestampColumnVector) batch.cols[projectionColumnNum]).set(
-              batchIndex, deserializeRead.currentTimestampWritable.getTimestamp());
-          break;
-        case DATE:
-          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
-              deserializeRead.currentDateWritable.getDays();
-          break;
-        case FLOAT:
-          ((DoubleColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
-              deserializeRead.currentFloat;
-          break;
-        case DOUBLE:
-          ((DoubleColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
-              deserializeRead.currentDouble;
-          break;
-        case BINARY:
-        case STRING:
-          {
-            BytesColumnVector bytesColVec = ((BytesColumnVector) batch.cols[projectionColumnNum]);
-            if (deserializeRead.currentExternalBufferNeeded) {
-              bytesColVec.ensureValPreallocated(deserializeRead.currentExternalBufferNeededLen);
-              deserializeRead.copyToExternalBuffer(
-                  bytesColVec.getValPreallocatedBytes(), bytesColVec.getValPreallocatedStart());
-              bytesColVec.setValPreallocated(
-                  batchIndex,
-                  deserializeRead.currentExternalBufferNeededLen);
-            } else if (canRetainByteRef && inputBytes == deserializeRead.currentBytes) {
-              bytesColVec.setRef(
-                  batchIndex,
-                  deserializeRead.currentBytes,
-                  deserializeRead.currentBytesStart,
-                  deserializeRead.currentBytesLength);
-            } else {
-              bytesColVec.setVal(
-                  batchIndex,
-                  deserializeRead.currentBytes,
-                  deserializeRead.currentBytesStart,
-                  deserializeRead.currentBytesLength);
-            }
-          }
-          break;
-        case VARCHAR:
-          {
-            // Use the basic STRING bytes read to get access, then use our optimal truncate/trim method
-            // that does not use Java String objects.
-            BytesColumnVector bytesColVec = ((BytesColumnVector) batch.cols[projectionColumnNum]);
-            if (deserializeRead.currentExternalBufferNeeded) {
-              // Write directly into our BytesColumnVector value buffer.
-              bytesColVec.ensureValPreallocated(deserializeRead.currentExternalBufferNeededLen);
-              byte[] convertBuffer = bytesColVec.getValPreallocatedBytes();
-              int convertBufferStart = bytesColVec.getValPreallocatedStart();
-              deserializeRead.copyToExternalBuffer(
-                  convertBuffer,
-                  convertBufferStart);
-              bytesColVec.setValPreallocated(
-                  batchIndex,
-                  StringExpr.truncate(
-                      convertBuffer,
-                      convertBufferStart,
-                      deserializeRead.currentExternalBufferNeededLen,
-                      maxLengths[logicalColumnIndex]));
-            } else if (canRetainByteRef && inputBytes == deserializeRead.currentBytes) {
-              bytesColVec.setRef(
-                  batchIndex,
-                  deserializeRead.currentBytes,
-                  deserializeRead.currentBytesStart,
-                  StringExpr.truncate(
-                      deserializeRead.currentBytes,
-                      deserializeRead.currentBytesStart,
-                      deserializeRead.currentBytesLength,
-                      maxLengths[logicalColumnIndex]));
-            } else {
-              bytesColVec.setVal(
-                  batchIndex,
-                  deserializeRead.currentBytes,
-                  deserializeRead.currentBytesStart,
-                  StringExpr.truncate(
-                      deserializeRead.currentBytes,
-                      deserializeRead.currentBytesStart,
-                      deserializeRead.currentBytesLength,
-                      maxLengths[logicalColumnIndex]));
-            }
-          }
-          break;
-        case CHAR:
-          {
-            // Use the basic STRING bytes read to get access, then use our optimal truncate/trim method
-            // that does not use Java String objects.
-            BytesColumnVector bytesColVec = ((BytesColumnVector) batch.cols[projectionColumnNum]);
-            if (deserializeRead.currentExternalBufferNeeded) {
-              // Write directly into our BytesColumnVector value buffer.
-              bytesColVec.ensureValPreallocated(deserializeRead.currentExternalBufferNeededLen);
-              byte[] convertBuffer = bytesColVec.getValPreallocatedBytes();
-              int convertBufferStart = bytesColVec.getValPreallocatedStart();
-              deserializeRead.copyToExternalBuffer(
-                  convertBuffer,
-                  convertBufferStart);
-              bytesColVec.setValPreallocated(
-                  batchIndex,
-                  StringExpr.rightTrimAndTruncate(
-                      convertBuffer,
-                      convertBufferStart,
-                      deserializeRead.currentExternalBufferNeededLen,
-                      maxLengths[logicalColumnIndex]));
-            } else if (canRetainByteRef && inputBytes == deserializeRead.currentBytes) {
-              bytesColVec.setRef(
-                  batchIndex,
-                  deserializeRead.currentBytes,
-                  deserializeRead.currentBytesStart,
-                  StringExpr.rightTrimAndTruncate(
-                      deserializeRead.currentBytes,
-                      deserializeRead.currentBytesStart,
-                      deserializeRead.currentBytesLength,
-                      maxLengths[logicalColumnIndex]));
-            } else {
-              bytesColVec.setVal(
-                  batchIndex,
-                  deserializeRead.currentBytes,
-                  deserializeRead.currentBytesStart,
-                  StringExpr.rightTrimAndTruncate(
-                      deserializeRead.currentBytes,
-                      deserializeRead.currentBytesStart,
-                      deserializeRead.currentBytesLength,
-                      maxLengths[logicalColumnIndex]));
-            }
-          }
-          break;
-        case DECIMAL:
-          // The DecimalColumnVector set method will quickly copy the deserialized decimal writable fields.
-          ((DecimalColumnVector) batch.cols[projectionColumnNum]).set(
-              batchIndex, deserializeRead.currentHiveDecimalWritable);
-          break;
-        case INTERVAL_YEAR_MONTH:
-          ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] =
-              deserializeRead.currentHiveIntervalYearMonthWritable.getHiveIntervalYearMonth().getTotalMonths();
-          break;
-        case INTERVAL_DAY_TIME:
-          ((IntervalDayTimeColumnVector) batch.cols[projectionColumnNum]).set(
-              batchIndex, deserializeRead.currentHiveIntervalDayTimeWritable.getHiveIntervalDayTime());
-          break;
-        default:
-          throw new RuntimeException("Primitive category " + sourcePrimitiveCategory.name() +
-              " not supported");
-        }
-      }
+      storePrimitiveRowColumn(colVector, field, batchIndex, canRetainByteRef);
+      break;
+    case LIST:
+      storeListRowColumn(colVector, field, batchIndex, canRetainByteRef);
+      break;
+    case MAP:
+      storeMapRowColumn(colVector, field, batchIndex, canRetainByteRef);
+      break;
+    case STRUCT:
+      storeStructRowColumn(colVector, field, batchIndex, canRetainByteRef);
+      break;
+    case UNION:
+      storeUnionRowColumn(colVector, field, batchIndex, canRetainByteRef);
       break;
     default:
-      throw new RuntimeException("Category " + sourceCategories[logicalColumnIndex] + " not supported");
+      throw new RuntimeException("Category " + field.getCategory() + " not supported");
     }
 
     // We always set the null flag to false when there is a value.
@@ -572,13 +905,13 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
    * @throws IOException
    */
   private void convertRowColumn(VectorizedRowBatch batch, int batchIndex,
-      int logicalColumnIndex) throws IOException {
-    final int projectionColumnNum = projectionColumnNums[logicalColumnIndex];
-    Writable convertSourceWritable = convertSourceWritables[logicalColumnIndex];
-    switch (sourceCategories[logicalColumnIndex]) {
+      Field field, int logicalColumnIndex) throws IOException {
+
+    Writable convertSourceWritable = field.getConversionWritable();
+    switch (field.getCategory()) {
     case PRIMITIVE:
       {
-        switch (sourcePrimitiveCategories[logicalColumnIndex]) {
+        switch (field.getPrimitiveCategory()) {
         case VOID:
           convertSourceWritable = null;
           break;
@@ -611,7 +944,9 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
           break;
         case BINARY:
           if (deserializeRead.currentBytes == null) {
-            LOG.info("null binary entry: batchIndex " + batchIndex + " projection column num " + projectionColumnNum);
+            LOG.info(
+                "null binary entry: batchIndex " + batchIndex + " projection column num " +
+                projectionColumnNums[logicalColumnIndex]);
           }
 
           ((BytesWritable) convertSourceWritable).set(
@@ -622,7 +957,8 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
         case STRING:
           if (deserializeRead.currentBytes == null) {
             throw new RuntimeException(
-                "null string entry: batchIndex " + batchIndex + " projection column num " + projectionColumnNum);
+                "null string entry: batchIndex " + batchIndex + " projection column num " +
+                projectionColumnNums[logicalColumnIndex]);
           }
 
           // Use org.apache.hadoop.io.Text as our helper to go from byte[] to String.
@@ -637,14 +973,15 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
             // that does not use Java String objects.
             if (deserializeRead.currentBytes == null) {
               throw new RuntimeException(
-                  "null varchar entry: batchIndex " + batchIndex + " projection column num " + projectionColumnNum);
+                  "null varchar entry: batchIndex " + batchIndex + " projection column num " +
+                  projectionColumnNums[logicalColumnIndex]);
             }
 
             int adjustedLength = StringExpr.truncate(
                 deserializeRead.currentBytes,
                 deserializeRead.currentBytesStart,
                 deserializeRead.currentBytesLength,
-                maxLengths[logicalColumnIndex]);
+                field.getMaxLength());
 
             ((HiveVarcharWritable) convertSourceWritable).set(
                 new String(
@@ -661,14 +998,15 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
             // that does not use Java String objects.
             if (deserializeRead.currentBytes == null) {
               throw new RuntimeException(
-                  "null char entry: batchIndex " + batchIndex + " projection column num " + projectionColumnNum);
+                  "null char entry: batchIndex " + batchIndex + " projection column num " +
+                  projectionColumnNums[logicalColumnIndex]);
             }
 
             int adjustedLength = StringExpr.rightTrimAndTruncate(
                 deserializeRead.currentBytes,
                 deserializeRead.currentBytesStart,
                 deserializeRead.currentBytesLength,
-                maxLengths[logicalColumnIndex]);
+                field.getMaxLength());
 
             ((HiveCharWritable) convertSourceWritable).set(
                 new String(
@@ -691,13 +1029,26 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
               deserializeRead.currentHiveIntervalDayTimeWritable);
           break;
         default:
-          throw new RuntimeException("Primitive category " + sourcePrimitiveCategories[logicalColumnIndex] +
+          throw new RuntimeException("Primitive category " + field.getPrimitiveCategory() +
               " not supported");
         }
       }
       break;
+
+    case STRUCT:
+    case UNION:
+      // The only aspect of conversion to Struct / Union themselves is add fields as NULL on the end
+      // (no removal from end? which would mean skipping fields...)
+
+      // UNDONE
+      break;
+
+    case LIST:
+    case MAP:
+      // Conversion only happens below to List elements or Map key and/or values and not to the
+      // List or Map itself.
     default:
-      throw new RuntimeException("Category " + sourceCategories[logicalColumnIndex] + " not supported");
+      throw new RuntimeException("Category " + field.getCategory() + " not supported");
     }
 
     /*
@@ -739,7 +1090,10 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
     // Pass false for canRetainByteRef since we will NOT be keeping byte references to the input
     // bytes with the BytesColumnVector.setRef method.
 
-    final int count = isConvert.length;
+    final int count = topLevelFields.length;
+
+    Field field;
+
     if (!useReadField) {
       for (int i = 0; i < count; i++) {
         final int projectionColumnNum = projectionColumnNums[i];
@@ -755,10 +1109,11 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
           continue;
         }
         // The current* members of deserializeRead have the field value.
-        if (isConvert[i]) {
-          convertRowColumn(batch, batchIndex, i);
+        field = topLevelFields[i];
+        if (field.getIsConvert()) {
+          convertRowColumn(batch, batchIndex, field, i);
         } else {
-          storeRowColumn(batch, batchIndex, i, /* canRetainByteRef */ false);
+          storeRowColumn(batch, batchIndex, field, i, /* canRetainByteRef */ false);
         }
       }
     } else {
@@ -773,10 +1128,11 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
           continue;
         }
         // The current* members of deserializeRead have the field value.
-        if (isConvert[logicalIndex]) {
-          convertRowColumn(batch, batchIndex, logicalIndex);
+        field = topLevelFields[logicalIndex];
+        if (field.getIsConvert()) {
+          convertRowColumn(batch, batchIndex, field, logicalIndex);
         } else {
-          storeRowColumn(batch, batchIndex, logicalIndex, /* canRetainByteRef */ false);
+          storeRowColumn(batch, batchIndex, field, logicalIndex, /* canRetainByteRef */ false);
         }
       }
     }
@@ -803,7 +1159,11 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
    * @throws IOException
    */
   public void deserializeByRef(VectorizedRowBatch batch, int batchIndex) throws IOException {
-    final int count = isConvert.length;
+
+    final int count = topLevelFields.length;
+
+    Field field;
+
     if (!useReadField) {
       for (int i = 0; i < count; i++) {
         final int projectionColumnNum = projectionColumnNums[i];
@@ -819,10 +1179,11 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
           continue;
         }
         // The current* members of deserializeRead have the field value.
-        if (isConvert[i]) {
-          convertRowColumn(batch, batchIndex, i);
+        field = topLevelFields[i];
+        if (field.getIsConvert()) {
+          convertRowColumn(batch, batchIndex, field, i);
         } else {
-          storeRowColumn(batch, batchIndex, i, /* canRetainByteRef */ true);
+          storeRowColumn(batch, batchIndex, field, i, /* canRetainByteRef */ true);
         }
       }
     } else {
@@ -837,10 +1198,11 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
           continue;
         }
         // The current* members of deserializeRead have the field value.
-        if (isConvert[logicalIndex]) {
-          convertRowColumn(batch, batchIndex, logicalIndex);
+        field = topLevelFields[logicalIndex];
+        if (field.getIsConvert()) {
+          convertRowColumn(batch, batchIndex, field, logicalIndex);
         } else {
-          storeRowColumn(batch, batchIndex, logicalIndex, /* canRetainByteRef */ true);
+          storeRowColumn(batch, batchIndex, field, logicalIndex, /* canRetainByteRef */ true);
         }
       }
     }