You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/24 23:51:50 UTC

[16/54] [abbrv] hive git commit: HIVE-16207: Add support for Complex Types in Fast SerDe (Teddy Choi, reviewed by Matt McCline)

http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java b/serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java
index cb775f7..889e448 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.serde2.fast;
 
 import java.io.IOException;
+
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.io.HiveIntervalDayTimeWritable;
@@ -26,8 +27,12 @@ import org.apache.hadoop.hive.serde2.io.HiveIntervalYearMonthWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
 
 /*
  * Directly deserialize with the caller reading field-by-field a serialization format.
@@ -52,6 +57,68 @@ public abstract class DeserializeRead {
   protected Category[] categories;
   protected PrimitiveCategory[] primitiveCategories;
 
+  /*
+   * This class is used to read one field at a time.  Simple fields like long, double, int are read
+   * into to primitive current* members; the non-simple field types like Date, Timestamp, etc, are
+   * read into a current object that this method will allocate.
+   *
+   * This method handles complex type fields by recursively calling this method.
+   */
+  private void allocateCurrentWritable(TypeInfo typeInfo) {
+    switch (typeInfo.getCategory()) {
+    case PRIMITIVE:
+      switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
+      case DATE:
+        if (currentDateWritable == null) {
+          currentDateWritable = new DateWritable();
+        }
+        break;
+      case TIMESTAMP:
+        if (currentTimestampWritable == null) {
+          currentTimestampWritable = new TimestampWritable();
+        }
+        break;
+      case INTERVAL_YEAR_MONTH:
+        if (currentHiveIntervalYearMonthWritable == null) {
+          currentHiveIntervalYearMonthWritable = new HiveIntervalYearMonthWritable();
+        }
+        break;
+      case INTERVAL_DAY_TIME:
+        if (currentHiveIntervalDayTimeWritable == null) {
+          currentHiveIntervalDayTimeWritable = new HiveIntervalDayTimeWritable();
+        }
+        break;
+      case DECIMAL:
+        if (currentHiveDecimalWritable == null) {
+          currentHiveDecimalWritable = new HiveDecimalWritable();
+        }
+        break;
+      default:
+        // No writable needed for this data type.
+      }
+      break;
+    case LIST:
+      allocateCurrentWritable(((ListTypeInfo) typeInfo).getListElementTypeInfo());
+      break;
+    case MAP:
+      allocateCurrentWritable(((MapTypeInfo) typeInfo).getMapKeyTypeInfo());
+      allocateCurrentWritable(((MapTypeInfo) typeInfo).getMapValueTypeInfo());
+      break;
+    case STRUCT:
+      for (TypeInfo fieldTypeInfo : ((StructTypeInfo) typeInfo).getAllStructFieldTypeInfos()) {
+        allocateCurrentWritable(fieldTypeInfo);
+      }
+      break;
+    case UNION:
+      for (TypeInfo fieldTypeInfo : ((UnionTypeInfo) typeInfo).getAllUnionObjectTypeInfos()) {
+        allocateCurrentWritable(fieldTypeInfo);
+      }
+      break;
+    default:
+      throw new RuntimeException("Unexpected category " + typeInfo.getCategory());
+    }
+  }
+
   /**
    * Constructor.
    *
@@ -85,37 +152,8 @@ public abstract class DeserializeRead {
         PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
         PrimitiveCategory primitiveCategory = primitiveTypeInfo.getPrimitiveCategory();
         primitiveCategories[i] = primitiveCategory;
-
-        switch (primitiveCategory) {
-        case DATE:
-          if (currentDateWritable == null) {
-            currentDateWritable = new DateWritable();
-          }
-          break;
-        case TIMESTAMP:
-          if (currentTimestampWritable == null) {
-            currentTimestampWritable = new TimestampWritable();
-          }
-          break;
-        case INTERVAL_YEAR_MONTH:
-          if (currentHiveIntervalYearMonthWritable == null) {
-            currentHiveIntervalYearMonthWritable = new HiveIntervalYearMonthWritable();
-          }
-          break;
-        case INTERVAL_DAY_TIME:
-          if (currentHiveIntervalDayTimeWritable == null) {
-            currentHiveIntervalDayTimeWritable = new HiveIntervalDayTimeWritable();
-          }
-          break;
-        case DECIMAL:
-          if (currentHiveDecimalWritable == null) {
-            currentHiveDecimalWritable = new HiveDecimalWritable();
-          }
-          break;
-        default:
-          // No writable needed for this data type.
-        }
       }
+      allocateCurrentWritable(typeInfo);
 
       this.useExternalBuffer = useExternalBuffer;
     }
@@ -178,6 +216,22 @@ public abstract class DeserializeRead {
   }
 
   /*
+   * Tests whether there is another List element or another Map key/value pair.
+   */
+  public abstract boolean isNextComplexMultiValue() throws IOException;
+
+  /*
+   * Read a field that is under a complex type.  It may be a primitive type or deeper complex type.
+   */
+  public abstract boolean readComplexField() throws IOException;
+
+  /*
+   * Used by Struct and Union complex type readers to indicate the (final) field has been fully
+   * read and the current complex type is finished.
+   */
+  public abstract void finishComplexVariableFieldsType();
+
+  /*
    * Call this method may be called after all the all fields have been read to check
    * for unread fields.
    *

http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/serde/src/java/org/apache/hadoop/hive/serde2/fast/SerializeWrite.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/fast/SerializeWrite.java b/serde/src/java/org/apache/hadoop/hive/serde2/fast/SerializeWrite.java
index 17d2385..89bcf4f 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/fast/SerializeWrite.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/fast/SerializeWrite.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.hive.serde2.fast;
 import java.io.IOException;
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
@@ -154,4 +156,32 @@ public interface SerializeWrite {
    */
   void writeHiveDecimal(HiveDecimal dec, int scale) throws IOException;
   void writeHiveDecimal(HiveDecimalWritable decWritable, int scale) throws IOException;
+
+  /*
+   * LIST.
+   */
+  void beginList(List list);
+  void separateList();
+  void finishList();
+
+  /*
+   * MAP.
+   */
+  void beginMap(Map<?, ?> map);
+  void separateKey();
+  void separateKeyValuePair();
+  void finishMap();
+
+  /*
+   * STRUCT.
+   */
+  void beginStruct(List fieldValues);
+  void separateStruct();
+  void finishStruct();
+
+  /*
+   * UNION.
+   */
+  void beginUnion(int tag) throws IOException;
+  void finishUnion();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java b/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java
index af00a30..6866d49 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.serde2.io;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.sql.Timestamp;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
@@ -133,7 +132,8 @@ public class TimestampWritable implements WritableComparable<TimestampWritable>
       timestamp.setNanos(0);
       return;
     }
-    this.timestamp = t;
+    timestamp.setTime(t.getTime());
+    timestamp.setNanos(t.getNanos());
     bytesEmpty = true;
     timestampEmpty = false;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/serde/src/java/org/apache/hadoop/hive/serde2/lazy/VerifyLazy.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/lazy/VerifyLazy.java b/serde/src/java/org/apache/hadoop/hive/serde2/lazy/VerifyLazy.java
new file mode 100644
index 0000000..324f5b8
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/lazy/VerifyLazy.java
@@ -0,0 +1,444 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.lazy;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.HiveIntervalDayTimeWritable;
+import org.apache.hadoop.hive.serde2.io.HiveIntervalYearMonthWritable;
+import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryArray;
+import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryMap;
+import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct;
+import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUnion;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObject;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * TestBinarySortableSerDe.
+ *
+ */
+public class VerifyLazy {
+
+  public static boolean lazyCompareList(ListTypeInfo listTypeInfo, List<Object> list, List<Object> expectedList) {
+    TypeInfo elementTypeInfo = listTypeInfo.getListElementTypeInfo();
+    final int size = list.size();
+    for (int i = 0; i < size; i++) {
+      Object lazyEleObj = list.get(i);
+      Object expectedEleObj = expectedList.get(i);
+      if (!lazyCompare(elementTypeInfo, lazyEleObj, expectedEleObj)) {
+        throw new RuntimeException("List element deserialized value does not match elementTypeInfo " + elementTypeInfo.toString());
+      }
+    }
+    return true;
+  }
+
+  public static boolean lazyCompareMap(MapTypeInfo mapTypeInfo, Map<Object, Object> map, Map<Object, Object> expectedMap) {
+    TypeInfo keyTypeInfo = mapTypeInfo.getMapKeyTypeInfo();
+    TypeInfo valueTypeInfo = mapTypeInfo.getMapValueTypeInfo();
+    if (map.size() != expectedMap.size()) {
+      throw new RuntimeException("Map key/value deserialized map.size() " + map.size() + " map " + map.toString() +
+          " expectedMap.size() " + expectedMap.size() + " expectedMap " + expectedMap.toString() +
+          " does not match keyTypeInfo " + keyTypeInfo.toString() + " valueTypeInfo " + valueTypeInfo.toString());
+    }
+    return true;
+  }
+
+  public static boolean lazyCompareStruct(StructTypeInfo structTypeInfo, List<Object> fields, List<Object> expectedFields) {
+    ArrayList<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+    final int size = fieldTypeInfos.size();
+    for (int i = 0; i < size; i++) {
+      Object lazyEleObj = fields.get(i);
+      Object expectedEleObj = expectedFields.get(i);
+      if (!lazyCompare(fieldTypeInfos.get(i), lazyEleObj, expectedEleObj)) {
+        throw new RuntimeException("SerDe deserialized value does not match");
+      }
+    }
+    return true;
+  }
+
+  public static boolean lazyCompareUnion(UnionTypeInfo unionTypeInfo, LazyBinaryUnion union, UnionObject expectedUnion) {
+    byte tag = union.getTag();
+    byte expectedTag = expectedUnion.getTag();
+    if (tag != expectedTag) {
+      throw new RuntimeException("Union tag does not match union.getTag() " + tag + " expectedUnion.getTag() " + expectedTag);
+    }
+    return lazyCompare(unionTypeInfo.getAllUnionObjectTypeInfos().get(tag),
+        union.getField(), expectedUnion.getObject());
+  }
+
+  public static boolean lazyCompareUnion(UnionTypeInfo unionTypeInfo, LazyUnion union, UnionObject expectedUnion) {
+    byte tag = union.getTag();
+    byte expectedTag = expectedUnion.getTag();
+    if (tag != expectedTag) {
+      throw new RuntimeException("Union tag does not match union.getTag() " + tag + " expectedUnion.getTag() " + expectedTag);
+    }
+    return lazyCompare(unionTypeInfo.getAllUnionObjectTypeInfos().get(tag),
+        union.getField(), expectedUnion.getObject());
+  }
+
+  public static boolean lazyCompareUnion(UnionTypeInfo unionTypeInfo, UnionObject union, UnionObject expectedUnion) {
+    byte tag = union.getTag();
+    byte expectedTag = expectedUnion.getTag();
+    if (tag != expectedTag) {
+      throw new RuntimeException("Union tag does not match union.getTag() " + tag +
+          " expectedUnion.getTag() " + expectedTag);
+    }
+    return lazyCompare(unionTypeInfo.getAllUnionObjectTypeInfos().get(tag),
+        union.getObject(), expectedUnion.getObject());
+  }
+
+  public static boolean lazyCompare(TypeInfo typeInfo, Object lazyObject, Object expectedObject) {
+    if (expectedObject == null) {
+      if (lazyObject != null) {
+        throw new RuntimeException("Expected object is null but object is not null " + lazyObject.toString() +
+            " typeInfo " + typeInfo.toString());
+      }
+      return true;
+    } else if (lazyObject == null) {
+      throw new RuntimeException("Expected object is not null \"" + expectedObject.toString() +
+          "\" typeInfo " + typeInfo.toString() + " but object is null");
+    }
+    if (lazyObject instanceof Writable) {
+      if (!lazyObject.equals(expectedObject)) {
+        throw new RuntimeException("Expected object " + expectedObject.toString() +
+            " and actual object " + lazyObject.toString() + " is not equal typeInfo " + typeInfo.toString());
+      }
+      return true;
+    }
+    if (lazyObject instanceof LazyPrimitive) {
+      Object primitiveObject = ((LazyPrimitive) lazyObject).getObject();
+      PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
+      switch (primitiveTypeInfo.getPrimitiveCategory()) {
+      case BOOLEAN:
+        {
+          if (!(primitiveObject instanceof LazyBoolean)) {
+            throw new RuntimeException("Expected LazyBoolean");
+          }
+          boolean value = ((LazyBoolean) primitiveObject).getWritableObject().get();
+          boolean expected = ((BooleanWritable) expectedObject).get();
+          if (value != expected) {
+            throw new RuntimeException("Boolean field mismatch (expected " + expected + " found " + value + ")");
+          }
+        }
+        break;
+      case BYTE:
+        {
+          if (!(primitiveObject instanceof LazyByte)) {
+            throw new RuntimeException("Expected LazyByte");
+          }
+          byte value = ((LazyByte) primitiveObject).getWritableObject().get();
+          byte expected = ((ByteWritable) expectedObject).get();
+          if (value != expected) {
+            throw new RuntimeException("Byte field mismatch (expected " + (int) expected + " found " + (int) value + ")");
+          }
+        }
+        break;
+      case SHORT:
+        {
+          if (!(primitiveObject instanceof LazyShort)) {
+            throw new RuntimeException("Expected LazyShort");
+          }
+          short value = ((LazyShort) primitiveObject).getWritableObject().get();
+          short expected = ((ShortWritable) expectedObject).get();
+          if (value != expected) {
+            throw new RuntimeException("Short field mismatch (expected " + expected + " found " + value + ")");
+          }
+        }
+        break;
+      case INT:
+        {
+          if (!(primitiveObject instanceof LazyInteger)) {
+            throw new RuntimeException("Expected LazyInteger");
+          }
+          int value = ((LazyInteger) primitiveObject).getWritableObject().get();
+          int expected = ((IntWritable) expectedObject).get();
+          if (value != expected) {
+            throw new RuntimeException("Int field mismatch (expected " + expected + " found " + value + ")");
+          }
+        }
+        break;
+      case LONG:
+        {
+          if (!(primitiveObject instanceof LazyLong)) {
+            throw new RuntimeException("Expected LazyLong");
+          }
+          long value = ((LazyLong) primitiveObject).getWritableObject().get();
+          long expected = ((LongWritable) expectedObject).get();
+          if (value != expected) {
+            throw new RuntimeException("Long field mismatch (expected " + expected + " found " + value + ")");
+          }
+        }
+        break;
+      case FLOAT:
+        {
+          if (!(primitiveObject instanceof LazyFloat)) {
+            throw new RuntimeException("Expected LazyFloat");
+          }
+          float value = ((LazyFloat) primitiveObject).getWritableObject().get();
+          float expected = ((FloatWritable) expectedObject).get();
+          if (value != expected) {
+            throw new RuntimeException("Float field mismatch (expected " + expected + " found " + value + ")");
+          }
+        }
+        break;
+      case DOUBLE:
+        {
+          if (!(primitiveObject instanceof LazyDouble)) {
+            throw new RuntimeException("Expected LazyDouble");
+          }
+          double value = ((LazyDouble) primitiveObject).getWritableObject().get();
+          double expected = ((DoubleWritable) expectedObject).get();
+          if (value != expected) {
+            throw new RuntimeException("Double field mismatch (expected " + expected + " found " + value + ")");
+          }
+        }
+        break;
+      case STRING:
+        {
+          if (!(primitiveObject instanceof LazyString)) {
+            throw new RuntimeException("Text expected writable not Text");
+          }
+          Text value = ((LazyString) primitiveObject).getWritableObject();
+          Text expected = ((Text) expectedObject);
+          if (!value.equals(expected)) {
+            throw new RuntimeException("String field mismatch (expected '" + expected + "' found '" + value + "')");
+          }
+        }
+        break;
+      case CHAR:
+        {
+          if (!(primitiveObject instanceof LazyHiveChar)) {
+            throw new RuntimeException("Expected LazyHiveChar");
+          }
+          HiveChar value = ((LazyHiveChar) primitiveObject).getWritableObject().getHiveChar();
+          HiveChar expected = ((HiveCharWritable) expectedObject).getHiveChar();
+  
+          if (!value.equals(expected)) {
+            throw new RuntimeException("HiveChar field mismatch (expected '" + expected + "' found '" + value + "')");
+          }
+        }
+        break;
+      case VARCHAR:
+        {
+          if (!(primitiveObject instanceof LazyHiveVarchar)) {
+            throw new RuntimeException("Expected LazyHiveVarchar");
+          }
+          HiveVarchar value = ((LazyHiveVarchar) primitiveObject).getWritableObject().getHiveVarchar();
+          HiveVarchar expected = ((HiveVarcharWritable) expectedObject).getHiveVarchar();
+  
+          if (!value.equals(expected)) {
+            throw new RuntimeException("HiveVarchar field mismatch (expected '" + expected + "' found '" + value + "')");
+          }
+        }
+        break;
+      case DECIMAL:
+        {
+          if (!(primitiveObject instanceof LazyHiveDecimal)) {
+            throw new RuntimeException("Expected LazyDecimal");
+          }
+          HiveDecimal value = ((LazyHiveDecimal) primitiveObject).getWritableObject().getHiveDecimal();
+          HiveDecimal expected = ((HiveDecimalWritable) expectedObject).getHiveDecimal();
+  
+          if (!value.equals(expected)) {
+            DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) primitiveTypeInfo;
+            int precision = decimalTypeInfo.getPrecision();
+            int scale = decimalTypeInfo.getScale();
+            throw new RuntimeException("Decimal field mismatch (expected " + expected.toString() +
+                " found " + value.toString() + ") precision " + precision + ", scale " + scale);
+          }
+        }
+        break;
+      case DATE:
+        {
+          if (!(primitiveObject instanceof LazyDate)) {
+            throw new RuntimeException("Expected LazyDate");
+          }
+          Date value = ((LazyDate) primitiveObject).getWritableObject().get();
+          Date expected = ((DateWritable) expectedObject).get();
+          if (!value.equals(expected)) {
+            throw new RuntimeException("Date field mismatch (expected " + expected + " found " + value + ")");
+          }
+        }
+        break;
+      case TIMESTAMP:
+        {
+          if (!(primitiveObject instanceof LazyTimestamp)) {
+            throw new RuntimeException("TimestampWritable expected writable not TimestampWritable");
+          }
+          Timestamp value = ((LazyTimestamp) primitiveObject).getWritableObject().getTimestamp();
+          Timestamp expected = ((TimestampWritable) expectedObject).getTimestamp();
+          if (!value.equals(expected)) {
+            throw new RuntimeException("Timestamp field mismatch (expected " + expected + " found " + value + ")");
+          }
+        }
+        break;
+      case INTERVAL_YEAR_MONTH:
+        {
+          if (!(primitiveObject instanceof LazyHiveIntervalYearMonth)) {
+            throw new RuntimeException("Expected LazyHiveIntervalYearMonth");
+          }
+          HiveIntervalYearMonth value = ((LazyHiveIntervalYearMonth) primitiveObject).getWritableObject().getHiveIntervalYearMonth();
+          HiveIntervalYearMonth expected = ((HiveIntervalYearMonthWritable) expectedObject).getHiveIntervalYearMonth();
+          if (!value.equals(expected)) {
+            throw new RuntimeException("HiveIntervalYearMonth field mismatch (expected " + expected + " found " + value + ")");
+          }
+        }
+        break;
+      case INTERVAL_DAY_TIME:
+        {
+          if (!(primitiveObject instanceof LazyHiveIntervalDayTime)) {
+            throw new RuntimeException("Expected writable LazyHiveIntervalDayTime");
+          }
+          HiveIntervalDayTime value = ((LazyHiveIntervalDayTime) primitiveObject).getWritableObject().getHiveIntervalDayTime();
+          HiveIntervalDayTime expected = ((HiveIntervalDayTimeWritable) expectedObject).getHiveIntervalDayTime();
+          if (!value.equals(expected)) {
+            throw new RuntimeException("HiveIntervalDayTime field mismatch (expected " + expected + " found " + value + ")");
+          }
+        }
+        break;
+      case BINARY:
+        {
+          if (!(primitiveObject instanceof LazyBinary)) {
+            throw new RuntimeException("Expected LazyBinary");
+          }
+          BytesWritable bytesWritable = ((LazyBinary) primitiveObject).getWritableObject();
+          byte[] value = Arrays.copyOfRange(bytesWritable.getBytes(), 0, bytesWritable.getLength());
+          BytesWritable bytesWritableExpected = (BytesWritable) expectedObject;
+          byte[] expected = Arrays.copyOfRange(bytesWritableExpected.getBytes(), 0, bytesWritableExpected.getLength());
+          if (value.length != expected.length){
+            throw new RuntimeException("Byte Array field mismatch (expected " + Arrays.toString(expected)
+                + " found " + Arrays.toString(value) + ")");
+          }
+          for (int b = 0; b < value.length; b++) {
+            if (value[b] != expected[b]) {
+              throw new RuntimeException("Byte Array field mismatch (expected " + Arrays.toString(expected)
+                + " found " + Arrays.toString(value) + ")");
+            }
+          }
+        }
+        break;
+      default:
+        throw new Error("Unknown primitive category " + primitiveTypeInfo.getPrimitiveCategory());
+      }
+    } else if (lazyObject instanceof LazyArray) {
+      LazyArray lazyArray = (LazyArray) lazyObject;
+      List<Object> list = lazyArray.getList();
+      List<Object> expectedList = (List<Object>) expectedObject;
+      ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
+      if (list.size() != expectedList.size()) {
+        throw new RuntimeException("SerDe deserialized list length does not match (list " +
+            list.toString() + " list.size() " + list.size() + " expectedList " + expectedList.toString() +
+            " expectedList.size() " + expectedList.size() + ")" +
+            " elementTypeInfo " + listTypeInfo.getListElementTypeInfo().toString());
+      }
+      return lazyCompareList((ListTypeInfo) typeInfo, list, expectedList);
+    } else if (typeInfo instanceof ListTypeInfo) {
+      List<Object> list;
+      if (lazyObject instanceof LazyBinaryArray) {
+        list = ((LazyBinaryArray) lazyObject).getList();
+      } else {
+        list = (List<Object>) lazyObject;
+      }
+      List<Object> expectedList = (List<Object>) expectedObject;
+      if (list.size() != expectedList.size()) {
+        throw new RuntimeException("SerDe deserialized list length does not match (list " +
+            list.toString() + " list.size() " + list.size() + " expectedList " + expectedList.toString() +
+            " expectedList.size() " + expectedList.size() + ")");
+      }
+      return lazyCompareList((ListTypeInfo) typeInfo, list, expectedList);
+    } else if (lazyObject instanceof LazyMap) {
+      LazyMap lazyMap = (LazyMap) lazyObject;
+      Map<Object, Object> map = lazyMap.getMap();
+      Map<Object, Object> expectedMap = (Map<Object, Object>) expectedObject;
+      return lazyCompareMap((MapTypeInfo) typeInfo, map, expectedMap);
+    } else if (typeInfo instanceof MapTypeInfo) {
+      Map<Object, Object> map;
+      Map<Object, Object> expectedMap = (Map<Object, Object>) expectedObject;
+      if (lazyObject instanceof LazyBinaryMap) {
+        map = ((LazyBinaryMap) lazyObject).getMap();
+      } else {
+        map = (Map<Object, Object>) lazyObject;
+      }
+      return lazyCompareMap((MapTypeInfo) typeInfo, map, expectedMap);
+    } else if (lazyObject instanceof LazyStruct) {
+      LazyStruct lazyStruct = (LazyStruct) lazyObject;
+      List<Object> fields = lazyStruct.getFieldsAsList();
+      List<Object> expectedFields = (List<Object>) expectedObject;
+      StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+      return lazyCompareStruct(structTypeInfo, fields, expectedFields);
+    } else if (typeInfo instanceof StructTypeInfo) {
+      ArrayList<Object> fields;
+      if (lazyObject instanceof LazyBinaryStruct) {
+        fields = ((LazyBinaryStruct) lazyObject).getFieldsAsList();
+      } else {
+        fields = (ArrayList<Object>) lazyObject;
+      }
+      List<Object> expectedFields = (List<Object>) expectedObject;
+      StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+      return lazyCompareStruct(structTypeInfo, fields, expectedFields);
+    } else if (lazyObject instanceof LazyUnion) {
+      LazyUnion union = (LazyUnion) lazyObject;
+      StandardUnionObjectInspector.StandardUnion expectedUnion = (StandardUnionObjectInspector.StandardUnion) expectedObject;
+      UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
+      return lazyCompareUnion(unionTypeInfo, union, expectedUnion);
+    } else if (typeInfo instanceof UnionTypeInfo) {
+      StandardUnionObjectInspector.StandardUnion expectedUnion = (StandardUnionObjectInspector.StandardUnion) expectedObject;
+      UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
+      if (lazyObject instanceof LazyBinaryUnion) {
+        return lazyCompareUnion(unionTypeInfo, (LazyBinaryUnion) lazyObject, expectedUnion);
+      } else {
+        return lazyCompareUnion(unionTypeInfo, (UnionObject) lazyObject, expectedUnion);
+      }
+    } else {
+      System.err.println("Not implemented " + typeInfo.getClass().getName());
+    }
+    return true;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java b/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java
index 606b246..64e316b 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java
@@ -19,11 +19,11 @@
 package org.apache.hadoop.hive.serde2.lazy.fast;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.StandardCharsets;
 import java.sql.Date;
 import java.util.Arrays;
+import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,12 +37,21 @@ import org.apache.hadoop.hive.serde2.lazy.LazyLong;
 import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters;
 import org.apache.hadoop.hive.serde2.lazy.LazyShort;
 import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
 import org.apache.hadoop.io.Text;
 import org.apache.hive.common.util.TimestampParser;
 
+import com.google.common.base.Preconditions;
+
 /*
  * Directly deserialize with the caller reading field-by-field the LazySimple (text)
  * serialization format.
@@ -61,9 +70,123 @@ import org.apache.hive.common.util.TimestampParser;
 public final class LazySimpleDeserializeRead extends DeserializeRead {
   public static final Logger LOG = LoggerFactory.getLogger(LazySimpleDeserializeRead.class.getName());
 
-  private int[] startPosition;
+  /*
+   * Information on a field.  Made a class to allow readField to be agnostic to whether a top level
+   * or field within a complex type is being read
+   */
+  private static class Field {
+
+    // Optimize for most common case -- primitive.
+    public final boolean isPrimitive;
+    public final PrimitiveCategory primitiveCategory;
+
+    public final Category complexCategory;
+
+    public final TypeInfo typeInfo;
+
+    public ComplexTypeHelper complexTypeHelper;
+
+    public Field(TypeInfo typeInfo) {
+      Category category = typeInfo.getCategory();
+      if (category == Category.PRIMITIVE) {
+        isPrimitive = true;
+        primitiveCategory = ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
+        complexCategory = null;
+      } else {
+        isPrimitive = false;
+        primitiveCategory = null;
+        complexCategory = category;
+      }
+
+      this.typeInfo = typeInfo;
+ 
+      complexTypeHelper = null;
+    }
+  }
+
+  /*
+   * Used to keep position/length for complex type fields.
+   * NOTE: The top level uses startPositions instead.
+   */
+  private static class ComplexTypeHelper {
+
+    public final Field complexField;
+
+    public int complexFieldStart;
+    public int complexFieldLength;
+    public int complexFieldEnd;
+
+    public int fieldPosition;
+
+    public ComplexTypeHelper(Field complexField) {
+      this.complexField = complexField;
+    }
+
+    public void setCurrentFieldInfo(int complexFieldStart, int complexFieldLength) {
+      this.complexFieldStart = complexFieldStart;
+      this.complexFieldLength = complexFieldLength;
+      complexFieldEnd = complexFieldStart + complexFieldLength;
+      fieldPosition = complexFieldStart;
+    }
+  }
+
+  private static class ListComplexTypeHelper extends ComplexTypeHelper {
+
+    public Field elementField;
+
+    public ListComplexTypeHelper(Field complexField, Field elementField) {
+      super(complexField);
+      this.elementField = elementField;
+    }
+  }
+
+  private static class MapComplexTypeHelper extends ComplexTypeHelper {
+
+    public Field keyField;
+    public Field valueField;
+
+    public boolean fieldHaveParsedKey;
+
+    public MapComplexTypeHelper(Field complexField, Field keyField, Field valueField) {
+      super(complexField);
+      this.keyField = keyField;
+      this.valueField = valueField;
+      fieldHaveParsedKey = false;
+    }
+  }
+
+  private static class StructComplexTypeHelper extends ComplexTypeHelper {
+
+    public Field[] fields;
+
+    public int nextFieldIndex;
+
+    public StructComplexTypeHelper(Field complexField, Field[] fields) {
+      super(complexField);
+      this.fields = fields;
+      nextFieldIndex = 0;
+    }
+  }
+
+  private static class UnionComplexTypeHelper extends ComplexTypeHelper {
+
+    public Field tagField;
+    public Field[] fields;
+
+    public boolean fieldHaveParsedTag;
+    public int fieldTag;
 
-  private final byte separator;
+    public UnionComplexTypeHelper(Field complexField, Field[] fields) {
+      super(complexField);
+      this.tagField = new Field(TypeInfoFactory.intTypeInfo);
+      this.fields = fields;
+      fieldHaveParsedTag = false;
+    }
+  }
+
+  private int[] startPositions;
+
+  private final byte[] separators;
   private final boolean isEscaped;
   private final byte escapeChar;
   private final int[] escapeCounts;
@@ -71,19 +194,25 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
   private final boolean isExtendedBooleanLiteral;
 
   private final int fieldCount;
+  private final Field[] fields;
+  private final int maxLevelDepth;
 
   private byte[] bytes;
   private int start;
   private int end;
-  private boolean parsed;
+  private boolean topLevelParsed;
 
   // Used by readNextField/skipNextField and not by readField.
   private int nextFieldIndex;
 
   // For getDetailedReadPositionString.
-  private int currentFieldIndex;
+  private int currentLevel;
+  private int currentTopLevelFieldIndex;
   private int currentFieldStart;
   private int currentFieldLength;
+  private int currentEscapeCount;
+
+  private ComplexTypeHelper[] currentComplexTypeHelpers;
 
   // For string/char/varchar buffering when there are escapes.
   private int internalBufferLen;
@@ -93,21 +222,112 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
 
   private boolean isEndOfInputReached;
 
+  private int addComplexFields(List<TypeInfo> fieldTypeInfoList, Field[] fields, int depth) {
+    Field field;
+    final int count = fieldTypeInfoList.size();
+    for (int i = 0; i < count; i++) {
+      field = new Field(fieldTypeInfoList.get(i));
+      if (!field.isPrimitive) {
+        depth = Math.max(depth, addComplexTypeHelper(field, depth));
+      }
+      fields[i] = field;
+    }
+    return depth;
+  }
+
+  private int addComplexTypeHelper(Field complexField, int depth) {
+
+    // Assume one separator (depth) needed.
+    depth++;
+
+    switch (complexField.complexCategory) {
+    case LIST:
+      {
+        final ListTypeInfo listTypeInfo = (ListTypeInfo) complexField.typeInfo;
+        final Field elementField = new Field(listTypeInfo.getListElementTypeInfo());
+        if (!elementField.isPrimitive) {
+          depth = addComplexTypeHelper(elementField, depth);
+        }
+        final ListComplexTypeHelper listHelper =
+            new ListComplexTypeHelper(complexField, elementField);
+        complexField.complexTypeHelper = listHelper;
+      }
+      break;
+    case MAP:
+      {
+        // Map needs two separators (key and key/value pair).
+        depth++;
+
+        final MapTypeInfo mapTypeInfo = (MapTypeInfo) complexField.typeInfo;
+        final Field keyField = new Field(mapTypeInfo.getMapKeyTypeInfo());
+        if (!keyField.isPrimitive) {
+          depth = Math.max(depth, addComplexTypeHelper(keyField, depth));
+        }
+        final Field valueField = new Field(mapTypeInfo.getMapValueTypeInfo());
+        if (!valueField.isPrimitive) {
+          depth = Math.max(depth, addComplexTypeHelper(valueField, depth));
+        }
+        final MapComplexTypeHelper mapHelper =
+            new MapComplexTypeHelper(complexField, keyField, valueField);
+        complexField.complexTypeHelper = mapHelper;
+      }
+      break;
+    case STRUCT:
+      {
+        final StructTypeInfo structTypeInfo = (StructTypeInfo) complexField.typeInfo;
+        final List<TypeInfo> fieldTypeInfoList = structTypeInfo.getAllStructFieldTypeInfos();
+        final Field[] fields = new Field[fieldTypeInfoList.size()];
+        depth = addComplexFields(fieldTypeInfoList, fields, depth);
+        final StructComplexTypeHelper structHelper =
+            new StructComplexTypeHelper(complexField, fields);
+        complexField.complexTypeHelper = structHelper;
+      }
+      break;
+    case UNION:
+      {
+        final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) complexField.typeInfo;
+        final List<TypeInfo> fieldTypeInfoList = unionTypeInfo.getAllUnionObjectTypeInfos();
+        final Field[] fields = new Field[fieldTypeInfoList.size()];
+        depth = addComplexFields(fieldTypeInfoList, fields, depth);
+        final UnionComplexTypeHelper structHelper =
+            new UnionComplexTypeHelper(complexField, fields);
+        complexField.complexTypeHelper = structHelper;
+      }
+      break;
+    default:
+      throw new Error("Unexpected complex category " + complexField.complexCategory);
+    }
+    return depth;
+  }
+
   public LazySimpleDeserializeRead(TypeInfo[] typeInfos, boolean useExternalBuffer,
-      byte separator, LazySerDeParameters lazyParams) {
+      LazySerDeParameters lazyParams) {
     super(typeInfos, useExternalBuffer);
 
-    fieldCount = typeInfos.length;
+    final int count = typeInfos.length;
+    fieldCount = count;
+    int depth = 0;
+    fields = new Field[count];
+    Field field;
+    for (int i = 0; i < count; i++) {
+      field = new Field(typeInfos[i]);
+      if (!field.isPrimitive) {
+        depth = Math.max(depth, addComplexTypeHelper(field, 0));
+      }
+      fields[i] = field;
+    }
+    maxLevelDepth = depth;
+    currentComplexTypeHelpers = new ComplexTypeHelper[depth];
 
     // Field length is difference between positions hence one extra.
-    startPosition = new int[fieldCount + 1];
+    startPositions = new int[count + 1];
 
-    this.separator = separator;
+    this.separators = lazyParams.getSeparators();
 
     isEscaped = lazyParams.isEscaped();
     if (isEscaped) {
       escapeChar = lazyParams.getEscapeChar();
-      escapeCounts = new int[fieldCount];
+      escapeCounts = new int[count];
     } else {
       escapeChar = (byte) 0;
       escapeCounts = null;
@@ -123,11 +343,6 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
     internalBufferLen = -1;
   }
 
-  public LazySimpleDeserializeRead(TypeInfo[] typeInfos, boolean useExternalBuffer,
-      LazySerDeParameters lazyParams) {
-    this(typeInfos, useExternalBuffer, lazyParams.getSeparators()[0], lazyParams);
-  }
-
   /*
    * Set the range of bytes to be deserialized.
    */
@@ -136,7 +351,8 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
     this.bytes = bytes;
     start = offset;
     end = offset + length;
-    parsed = false;
+    topLevelParsed = false;
+    currentLevel = 0;
     nextFieldIndex = -1;
   }
 
@@ -157,14 +373,15 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
     sb.append(" fields with types ");
     sb.append(Arrays.toString(typeInfos));
     sb.append(".  ");
-    if (!parsed) {
+    if (!topLevelParsed) {
       sb.append("Error during field separator parsing");
     } else {
       sb.append("Read field #");
-      sb.append(currentFieldIndex);
+      sb.append(currentTopLevelFieldIndex);
       sb.append(" at field start position ");
-      sb.append(startPosition[currentFieldIndex]);
-      int currentFieldLength = startPosition[currentFieldIndex + 1] - startPosition[currentFieldIndex] - 1;
+      sb.append(startPositions[currentTopLevelFieldIndex]);
+      int currentFieldLength = startPositions[currentTopLevelFieldIndex + 1] -
+          startPositions[currentTopLevelFieldIndex] - 1;
       sb.append(" for field length ");
       sb.append(currentFieldLength);
     }
@@ -178,15 +395,15 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
    * This is an adapted version of the parse method in the LazyStruct class.
    * They should parse things the same way.
    */
-  private void parse() {
+  private void topLevelParse() {
 
     int fieldId = 0;
     int fieldByteBegin = start;
     int fieldByteEnd = start;
 
-    final byte separator = this.separator;
+    final byte separator = this.separators[0];
     final int fieldCount = this.fieldCount;
-    final int[] startPosition = this.startPosition;
+    final int[] startPositions = this.startPositions;
     final byte[] bytes = this.bytes;
     final int end = this.end;
 
@@ -196,7 +413,7 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
     if (!isEscaped) {
       while (fieldByteEnd < end) {
         if (bytes[fieldByteEnd] == separator) {
-          startPosition[fieldId++] = fieldByteBegin;
+          startPositions[fieldId++] = fieldByteBegin;
           if (fieldId == fieldCount) {
             break;
           }
@@ -207,7 +424,7 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
       }
       // End serves as final separator.
       if (fieldByteEnd == end && fieldId < fieldCount) {
-        startPosition[fieldId++] = fieldByteBegin;
+        startPositions[fieldId++] = fieldByteBegin;
       }
     } else {
       final byte escapeChar = this.escapeChar;
@@ -219,7 +436,7 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
         if (bytes[fieldByteEnd] == separator) {
           escapeCounts[fieldId] = escapeCount;
           escapeCount = 0;
-          startPosition[fieldId++] = fieldByteBegin;
+          startPositions[fieldId++] = fieldByteBegin;
           if (fieldId == fieldCount) {
             break;
           }
@@ -237,7 +454,7 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
         if (bytes[fieldByteEnd] == separator) {
           escapeCounts[fieldId] = escapeCount;
           escapeCount = 0;
-          startPosition[fieldId++] = fieldByteBegin;
+          startPositions[fieldId++] = fieldByteBegin;
           if (fieldId <= fieldCount) {
             fieldByteBegin = ++fieldByteEnd;
           }
@@ -248,23 +465,66 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
       // End serves as final separator.
       if (fieldByteEnd == end && fieldId < fieldCount) {
         escapeCounts[fieldId] = escapeCount;
-        startPosition[fieldId++] = fieldByteBegin;
+        startPositions[fieldId++] = fieldByteBegin;
       }
     }
 
     if (fieldId == fieldCount || fieldByteEnd == end) {
       // All fields have been parsed, or bytes have been parsed.
-      // We need to set the startPosition of fields.length to ensure we
+      // We need to set the startPositions of fields.length to ensure we
       // can use the same formula to calculate the length of each field.
       // For missing fields, their starting positions will all be the same,
       // which will make their lengths to be -1 and uncheckedGetField will
       // return these fields as NULLs.
-      Arrays.fill(startPosition, fieldId, startPosition.length, fieldByteEnd + 1);
+      Arrays.fill(startPositions, fieldId, startPositions.length, fieldByteEnd + 1);
     }
 
     isEndOfInputReached = (fieldByteEnd == end);
   }
 
+  private int parseComplexField(int start, int end, int level) {
+
+    final byte separator = separators[level];
+    int fieldByteEnd = start;
+
+    final byte[] bytes = this.bytes;
+
+    currentEscapeCount = 0;
+    if (!isEscaped) {
+      while (fieldByteEnd < end) {
+        if (bytes[fieldByteEnd] == separator) {
+          return fieldByteEnd;
+        }
+        fieldByteEnd++;
+      }
+    } else {
+      final byte escapeChar = this.escapeChar;
+      final int endLessOne = end - 1;
+      int escapeCount = 0;
+      // Process the bytes that can be escaped (the last one can't be).
+      while (fieldByteEnd < endLessOne) {
+        if (bytes[fieldByteEnd] == separator) {
+          currentEscapeCount = escapeCount;
+          return fieldByteEnd;
+        } else if (bytes[fieldByteEnd] == escapeChar) {
+          // Ignore the char after escape_char
+          fieldByteEnd += 2;
+          escapeCount++;
+        } else {
+          fieldByteEnd++;
+        }
+      }
+      // Process the last byte.
+      if (fieldByteEnd == endLessOne) {
+        if (bytes[fieldByteEnd] != separator) {
+          fieldByteEnd++;
+        }
+      }
+      currentEscapeCount = escapeCount;
+    }
+    return fieldByteEnd;
+  }
+
   /*
    * Reads the the next field.
    *
@@ -291,9 +551,9 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
    * Designed for skipping columns that are not included.
    */
   public void skipNextField() throws IOException {
-    if (!parsed) {
-      parse();
-      parsed = true;
+    if (!topLevelParsed) {
+      topLevelParse();
+      topLevelParsed = true;
     }
     if (nextFieldIndex + 1 >= fieldCount) {
       // No more.
@@ -341,17 +601,26 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
    */
   public boolean readField(int fieldIndex) throws IOException {
 
-    if (!parsed) {
-      parse();
-      parsed = true;
+    Preconditions.checkState(currentLevel == 0);
+
+    if (!topLevelParsed) {
+      topLevelParse();
+      topLevelParsed = true;
     }
 
-    currentFieldIndex = fieldIndex;
+    // Top level.
+    currentTopLevelFieldIndex = fieldIndex;
 
-    final int fieldStart = startPosition[fieldIndex];
-    currentFieldStart = fieldStart;
-    final int fieldLength = startPosition[fieldIndex + 1] - startPosition[fieldIndex] - 1;
-    currentFieldLength = fieldLength;
+    currentFieldStart = startPositions[fieldIndex];
+    currentFieldLength = startPositions[fieldIndex + 1] - startPositions[fieldIndex] - 1;
+    currentEscapeCount = (isEscaped ? escapeCounts[fieldIndex] : 0);
+ 
+    return doReadField(fields[fieldIndex]);
+  }
+
+  private boolean doReadField(Field field) {
+    final int fieldStart = currentFieldStart;
+    final int fieldLength = currentFieldLength;
     if (fieldLength < 0) {
       return false;
     }
@@ -369,222 +638,252 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
       /*
        * We have a field and are positioned to it.  Read it.
        */
-      switch (primitiveCategories[fieldIndex]) {
-      case BOOLEAN:
-        {
-          int i = fieldStart;
-          if (fieldLength == 4) {
-            if ((bytes[i] == 'T' || bytes[i] == 't') &&
-                (bytes[i + 1] == 'R' || bytes[i + 1] == 'r') &&
-                (bytes[i + 2] == 'U' || bytes[i + 2] == 'u') &&
-                (bytes[i + 3] == 'E' || bytes[i + 3] == 'e')) {
-              currentBoolean = true;
-            } else {
-              // No boolean value match for 4 char field.
-              return false;
-            }
-          } else if (fieldLength == 5) {
-            if ((bytes[i] == 'F' || bytes[i] == 'f') &&
-                (bytes[i + 1] == 'A' || bytes[i + 1] == 'a') &&
-                (bytes[i + 2] == 'L' || bytes[i + 2] == 'l') &&
-                (bytes[i + 3] == 'S' || bytes[i + 3] == 's') &&
-                (bytes[i + 4] == 'E' || bytes[i + 4] == 'e')) {
-              currentBoolean = false;
-            } else {
-              // No boolean value match for 5 char field.
-              return false;
-            }
-          } else if (isExtendedBooleanLiteral && fieldLength == 1) {
-            byte b = bytes[fieldStart];
-            if (b == '1' || b == 't' || b == 'T') {
-              currentBoolean = true;
-            } else if (b == '0' || b == 'f' || b == 'F') {
-              currentBoolean = false;
+      if (field.isPrimitive) {
+        switch (field.primitiveCategory) {
+        case BOOLEAN:
+          {
+            int i = fieldStart;
+            if (fieldLength == 4) {
+              if ((bytes[i] == 'T' || bytes[i] == 't') &&
+                  (bytes[i + 1] == 'R' || bytes[i + 1] == 'r') &&
+                  (bytes[i + 2] == 'U' || bytes[i + 2] == 'u') &&
+                  (bytes[i + 3] == 'E' || bytes[i + 3] == 'e')) {
+                currentBoolean = true;
+              } else {
+                // No boolean value match for 4 char field.
+                return false;
+              }
+            } else if (fieldLength == 5) {
+              if ((bytes[i] == 'F' || bytes[i] == 'f') &&
+                  (bytes[i + 1] == 'A' || bytes[i + 1] == 'a') &&
+                  (bytes[i + 2] == 'L' || bytes[i + 2] == 'l') &&
+                  (bytes[i + 3] == 'S' || bytes[i + 3] == 's') &&
+                  (bytes[i + 4] == 'E' || bytes[i + 4] == 'e')) {
+                currentBoolean = false;
+              } else {
+                // No boolean value match for 5 char field.
+                return false;
+              }
+            } else if (isExtendedBooleanLiteral && fieldLength == 1) {
+              byte b = bytes[fieldStart];
+              if (b == '1' || b == 't' || b == 'T') {
+                currentBoolean = true;
+              } else if (b == '0' || b == 'f' || b == 'F') {
+                currentBoolean = false;
+              } else {
+                // No boolean value match for extended 1 char field.
+                return false;
+              }
             } else {
-              // No boolean value match for extended 1 char field.
+              // No boolean value match for other lengths.
               return false;
             }
-          } else {
-            // No boolean value match for other lengths.
+          }
+          return true;
+        case BYTE:
+          if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
             return false;
           }
-        }
-        return true;
-      case BYTE:
-        if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
-          return false;
-        }
-        currentByte = LazyByte.parseByte(bytes, fieldStart, fieldLength, 10);
-        return true;
-      case SHORT:
-        if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
-          return false;
-        }
-        currentShort = LazyShort.parseShort(bytes, fieldStart, fieldLength, 10);
-        return true;
-      case INT:
-        if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
-          return false;
-        }
-        currentInt = LazyInteger.parseInt(bytes, fieldStart, fieldLength, 10);
-        return true;
-      case LONG:
-        if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
-          return false;
-        }
-        currentLong = LazyLong.parseLong(bytes, fieldStart, fieldLength, 10);
-        return true;
-      case FLOAT:
-        if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
-          return false;
-        }
-        currentFloat =
-            Float.parseFloat(
-                new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8));
-        return true;
-      case DOUBLE:
-        if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
-          return false;
-        }
-        currentDouble = StringToDouble.strtod(bytes, fieldStart, fieldLength);
-        return true;
-      case STRING:
-      case CHAR:
-      case VARCHAR:
-        {
-          if (isEscaped) {
-            if (escapeCounts[fieldIndex] == 0) {
-              // No escaping.
+          currentByte = LazyByte.parseByte(bytes, fieldStart, fieldLength, 10);
+          return true;
+        case SHORT:
+          if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
+            return false;
+          }
+          currentShort = LazyShort.parseShort(bytes, fieldStart, fieldLength, 10);
+          return true;
+        case INT:
+          if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
+            return false;
+          }
+          currentInt = LazyInteger.parseInt(bytes, fieldStart, fieldLength, 10);
+          return true;
+        case LONG:
+          if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
+            return false;
+          }
+          currentLong = LazyLong.parseLong(bytes, fieldStart, fieldLength, 10);
+          return true;
+        case FLOAT:
+          if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
+            return false;
+          }
+          currentFloat =
+              Float.parseFloat(
+                  new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8));
+          return true;
+        case DOUBLE:
+          if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
+            return false;
+          }
+          currentDouble = StringToDouble.strtod(bytes, fieldStart, fieldLength);
+          return true;
+        case STRING:
+        case CHAR:
+        case VARCHAR:
+          {
+            if (isEscaped) {
+              if (currentEscapeCount == 0) {
+                // No escaping.
+                currentExternalBufferNeeded = false;
+                currentBytes = bytes;
+                currentBytesStart = fieldStart;
+                currentBytesLength = fieldLength;
+              } else {
+                final int unescapedLength = fieldLength - currentEscapeCount;
+                if (useExternalBuffer) {
+                  currentExternalBufferNeeded = true;
+                  currentExternalBufferNeededLen = unescapedLength;
+                } else {
+                  // The copyToBuffer will reposition and re-read the input buffer.
+                  currentExternalBufferNeeded = false;
+                  if (internalBufferLen < unescapedLength) {
+                    internalBufferLen = unescapedLength;
+                    internalBuffer = new byte[internalBufferLen];
+                  }
+                  copyToBuffer(internalBuffer, 0, unescapedLength);
+                  currentBytes = internalBuffer;
+                  currentBytesStart = 0;
+                  currentBytesLength = unescapedLength;
+                }
+              }
+            } else {
+              // If the data is not escaped, reference the data directly.
               currentExternalBufferNeeded = false;
               currentBytes = bytes;
               currentBytesStart = fieldStart;
               currentBytesLength = fieldLength;
-            } else {
-              final int unescapedLength = fieldLength - escapeCounts[fieldIndex];
-              if (useExternalBuffer) {
-                currentExternalBufferNeeded = true;
-                currentExternalBufferNeededLen = unescapedLength;
-              } else {
-                // The copyToBuffer will reposition and re-read the input buffer.
-                currentExternalBufferNeeded = false;
-                if (internalBufferLen < unescapedLength) {
-                  internalBufferLen = unescapedLength;
-                  internalBuffer = new byte[internalBufferLen];
-                }
-                copyToBuffer(internalBuffer, 0, unescapedLength);
-                currentBytes = internalBuffer;
-                currentBytesStart = 0;
-                currentBytesLength = unescapedLength;
-              }
             }
-          } else {
-            // If the data is not escaped, reference the data directly.
-            currentExternalBufferNeeded = false;
-            currentBytes = bytes;
-            currentBytesStart = fieldStart;
-            currentBytesLength = fieldLength;
           }
-        }
-        return true;
-      case BINARY:
-        {
-          byte[] recv = new byte[fieldLength];
-          System.arraycopy(bytes, fieldStart, recv, 0, fieldLength);
-          byte[] decoded = LazyBinary.decodeIfNeeded(recv);
-          // use the original bytes in case decoding should fail
-          decoded = decoded.length > 0 ? decoded : recv;
-          currentBytes = decoded;
-          currentBytesStart = 0;
-          currentBytesLength = decoded.length;
-        }
-        return true;
-      case DATE:
-        if (!LazyUtils.isDateMaybe(bytes, fieldStart, fieldLength)) {
-          return false;
-        }
-        currentDateWritable.set(
-            Date.valueOf(
-                new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8)));
-        return true;
-      case TIMESTAMP:
-        {
+          return true;
+        case BINARY:
+          {
+            byte[] recv = new byte[fieldLength];
+            System.arraycopy(bytes, fieldStart, recv, 0, fieldLength);
+            byte[] decoded = LazyBinary.decodeIfNeeded(recv);
+            // use the original bytes in case decoding should fail
+            decoded = decoded.length > 0 ? decoded : recv;
+            currentBytes = decoded;
+            currentBytesStart = 0;
+            currentBytesLength = decoded.length;
+          }
+          return true;
+        case DATE:
           if (!LazyUtils.isDateMaybe(bytes, fieldStart, fieldLength)) {
             return false;
           }
-          String s = new String(bytes, fieldStart, fieldLength, StandardCharsets.US_ASCII);
-          if (s.compareTo("NULL") == 0) {
-            logExceptionMessage(bytes, fieldStart, fieldLength, "TIMESTAMP");
+          currentDateWritable.set(
+              Date.valueOf(
+                  new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8)));
+          return true;
+        case TIMESTAMP:
+          {
+            if (!LazyUtils.isDateMaybe(bytes, fieldStart, fieldLength)) {
+              return false;
+            }
+            String s = new String(bytes, fieldStart, fieldLength, StandardCharsets.US_ASCII);
+            if (s.compareTo("NULL") == 0) {
+              logExceptionMessage(bytes, fieldStart, fieldLength, "TIMESTAMP");
+              return false;
+            }
+            try {
+              currentTimestampWritable.set(timestampParser.parseTimestamp(s));
+            } catch (IllegalArgumentException e) {
+              logExceptionMessage(bytes, fieldStart, fieldLength, "TIMESTAMP");
+              return false;
+            }
+          }
+          return true;
+        case INTERVAL_YEAR_MONTH:
+          if (fieldLength == 0) {
             return false;
           }
           try {
-            currentTimestampWritable.set(timestampParser.parseTimestamp(s));
-          } catch (IllegalArgumentException e) {
-            logExceptionMessage(bytes, fieldStart, fieldLength, "TIMESTAMP");
+            String s = new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8);
+            currentHiveIntervalYearMonthWritable.set(HiveIntervalYearMonth.valueOf(s));
+          } catch (Exception e) {
+            logExceptionMessage(bytes, fieldStart, fieldLength, "INTERVAL_YEAR_MONTH");
             return false;
           }
-        }
-        return true;
-      case INTERVAL_YEAR_MONTH:
-        if (fieldLength == 0) {
-          return false;
-        }
-        try {
-          String s = new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8);
-          currentHiveIntervalYearMonthWritable.set(HiveIntervalYearMonth.valueOf(s));
-        } catch (Exception e) {
-          logExceptionMessage(bytes, fieldStart, fieldLength, "INTERVAL_YEAR_MONTH");
-          return false;
-        }
-        return true;
-      case INTERVAL_DAY_TIME:
-        if (fieldLength == 0) {
-          return false;
-        }
-        try {
-          String s = new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8);
-          currentHiveIntervalDayTimeWritable.set(HiveIntervalDayTime.valueOf(s));
-        } catch (Exception e) {
-          logExceptionMessage(bytes, fieldStart, fieldLength, "INTERVAL_DAY_TIME");
-          return false;
-        }
-        return true;
-      case DECIMAL:
-        {
-          if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
+          return true;
+        case INTERVAL_DAY_TIME:
+          if (fieldLength == 0) {
+            return false;
+          }
+          try {
+            String s = new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8);
+            currentHiveIntervalDayTimeWritable.set(HiveIntervalDayTime.valueOf(s));
+          } catch (Exception e) {
+            logExceptionMessage(bytes, fieldStart, fieldLength, "INTERVAL_DAY_TIME");
             return false;
           }
-          // Trim blanks because OldHiveDecimal did...
-          currentHiveDecimalWritable.setFromBytes(bytes, fieldStart, fieldLength, /* trimBlanks */ true);
-          boolean decimalIsNull = !currentHiveDecimalWritable.isSet();
-          if (!decimalIsNull) {
-            DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfos[fieldIndex];
+          return true;
+        case DECIMAL:
+          {
+            if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
+              return false;
+            }
+            // Trim blanks because OldHiveDecimal did...
+            currentHiveDecimalWritable.setFromBytes(bytes, fieldStart, fieldLength, /* trimBlanks */ true);
+            boolean decimalIsNull = !currentHiveDecimalWritable.isSet();
+            if (!decimalIsNull) {
+              DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) field.typeInfo;
 
-            int precision = decimalTypeInfo.getPrecision();
-            int scale = decimalTypeInfo.getScale();
+              int precision = decimalTypeInfo.getPrecision();
+              int scale = decimalTypeInfo.getScale();
 
-            decimalIsNull = !currentHiveDecimalWritable.mutateEnforcePrecisionScale(precision, scale);
+              decimalIsNull = !currentHiveDecimalWritable.mutateEnforcePrecisionScale(precision, scale);
+            }
+            if (decimalIsNull) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Data not in the HiveDecimal data type range so converted to null. Given data is :"
+                  + new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8));
+              }
+              return false;
+            }
           }
-          if (decimalIsNull) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Data not in the HiveDecimal data type range so converted to null. Given data is :"
-                + new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8));
+          return true;
+
+        default:
+          throw new Error("Unexpected primitive category " + field.primitiveCategory);
+        }
+      } else {
+        switch (field.complexCategory) {
+        case LIST:
+        case MAP:
+        case STRUCT:
+        case UNION:
+          {
+            if (currentLevel > 0) {
+  
+              // Check for Map which occupies 2 levels (key separator and key/value pair separator).
+              if (currentComplexTypeHelpers[currentLevel - 1] == null) {
+                Preconditions.checkState(currentLevel > 1);
+                Preconditions.checkState(
+                    currentComplexTypeHelpers[currentLevel - 2] instanceof MapComplexTypeHelper);
+                currentLevel++;
+              }
             }
-            return false;
+            ComplexTypeHelper complexTypeHelper = field.complexTypeHelper; 
+            currentComplexTypeHelpers[currentLevel++] = complexTypeHelper;
+            if (field.complexCategory == Category.MAP) {
+              currentComplexTypeHelpers[currentLevel] = null;
+            }
+  
+            // Set up context for readNextComplexField.
+            complexTypeHelper.setCurrentFieldInfo(currentFieldStart, currentFieldLength);
           }
+          return true;
+        default:
+          throw new Error("Unexpected complex category " + field.complexCategory);
         }
-        return true;
-
-      default:
-        throw new Error("Unexpected primitive category " + primitiveCategories[fieldIndex].name());
       }
     } catch (NumberFormatException nfe) {
-       // U+FFFD will throw this as well
-       logExceptionMessage(bytes, fieldStart, fieldLength, primitiveCategories[fieldIndex]);
+       logExceptionMessage(bytes, fieldStart, fieldLength, field.complexCategory, field.primitiveCategory);
        return false;
     } catch (IllegalArgumentException iae) {
-       // E.g. can be thrown by Date.valueOf
-       logExceptionMessage(bytes, fieldStart, fieldLength, primitiveCategories[fieldIndex]);
-       return false;
+      logExceptionMessage(bytes, fieldStart, fieldLength, field.complexCategory, field.primitiveCategory);
+      return false;
     }
   }
 
@@ -616,6 +915,248 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
     }
   }
 
+  @Override
+  public boolean isNextComplexMultiValue() {
+    Preconditions.checkState(currentLevel > 0);
+
+    final ComplexTypeHelper complexTypeHelper = currentComplexTypeHelpers[currentLevel - 1];
+    final Field complexField = complexTypeHelper.complexField;
+    final int fieldPosition = complexTypeHelper.fieldPosition;
+    final int complexFieldEnd = complexTypeHelper.complexFieldEnd;
+    switch (complexField.complexCategory) {
+    case LIST:
+      {
+        // Allow for empty string, etc.
+        final boolean isNext = (fieldPosition <= complexFieldEnd);
+        if (!isNext) {
+          popComplexType();
+        }
+        return isNext;
+      }
+    case MAP:
+      {
+        final boolean isNext = (fieldPosition < complexFieldEnd);
+        if (!isNext) {
+          popComplexType();
+        }
+        return isNext;
+      }
+    case STRUCT:
+    case UNION:
+      throw new Error("Complex category " + complexField.complexCategory + " not multi-value");
+    default:
+      throw new Error("Unexpected complex category " + complexField.complexCategory);
+    }
+  }
+
+  private void popComplexType() {
+    Preconditions.checkState(currentLevel > 0);
+    currentLevel--;
+    if (currentLevel > 0) {
+
+      // Check for Map which occupies 2 levels (key separator and key/value pair separator).
+      if (currentComplexTypeHelpers[currentLevel - 1] == null) {
+        Preconditions.checkState(currentLevel > 1);
+        Preconditions.checkState(
+            currentComplexTypeHelpers[currentLevel - 2] instanceof MapComplexTypeHelper);
+        currentLevel--;
+      }
+    }
+  }
+
+  /*
+   * NOTE: There is an expectation that all fields will be read-thru.
+   */
+  @Override
+  public boolean readComplexField() throws IOException {
+
+    Preconditions.checkState(currentLevel > 0);
+
+    final ComplexTypeHelper complexTypeHelper = currentComplexTypeHelpers[currentLevel - 1];
+    final Field complexField = complexTypeHelper.complexField;
+    switch (complexField.complexCategory) {
+    case LIST:
+      {
+        final ListComplexTypeHelper listHelper = (ListComplexTypeHelper) complexTypeHelper;
+        final int fieldPosition = listHelper.fieldPosition;
+        final int complexFieldEnd = listHelper.complexFieldEnd;
+        Preconditions.checkState(fieldPosition <= complexFieldEnd);
+
+        final int fieldEnd = parseComplexField(fieldPosition, complexFieldEnd, currentLevel);
+        listHelper.fieldPosition = fieldEnd + 1;  // Move past separator.
+
+        currentFieldStart = fieldPosition;
+        currentFieldLength = fieldEnd - fieldPosition;
+
+        return doReadField(listHelper.elementField);
+      }
+    case MAP:
+      {
+        final MapComplexTypeHelper mapHelper = (MapComplexTypeHelper) complexTypeHelper;
+        final int fieldPosition = mapHelper.fieldPosition;
+        final int complexFieldEnd = mapHelper.complexFieldEnd;
+        Preconditions.checkState(fieldPosition <= complexFieldEnd);
+  
+        currentFieldStart = fieldPosition;
+
+        final boolean isParentMap = isParentMap();
+        if (isParentMap) {
+          currentLevel++;
+        }
+        int fieldEnd;
+        if (!mapHelper.fieldHaveParsedKey) {
+
+          // Parse until key separator (currentLevel + 1).
+          fieldEnd = parseComplexField(fieldPosition, complexFieldEnd, currentLevel + 1);
+
+          mapHelper.fieldPosition = fieldEnd + 1;  // Move past key separator.
+
+          currentFieldLength = fieldEnd - fieldPosition;
+
+          mapHelper.fieldHaveParsedKey = true;
+          final boolean result = doReadField(mapHelper.keyField);
+          if (isParentMap) {
+            currentLevel--;
+          }
+          return result;
+        } else {
+
+          // Parse until pair separator (currentLevel).
+          fieldEnd = parseComplexField(fieldPosition, complexFieldEnd, currentLevel);
+
+          mapHelper.fieldPosition = fieldEnd + 1;  // Move past pair separator.
+
+          currentFieldLength = fieldEnd - fieldPosition;
+
+          mapHelper.fieldHaveParsedKey = false;
+          final boolean result = doReadField(mapHelper.valueField);
+          if (isParentMap) {
+            currentLevel--;
+          }
+          return result;
+        }
+      }
+    case STRUCT:
+      {
+        final StructComplexTypeHelper structHelper = (StructComplexTypeHelper) complexTypeHelper;
+        final int fieldPosition = structHelper.fieldPosition;
+        final int complexFieldEnd = structHelper.complexFieldEnd;
+        Preconditions.checkState(fieldPosition <= complexFieldEnd);
+
+        currentFieldStart = fieldPosition;
+
+        final int nextFieldIndex = structHelper.nextFieldIndex;
+        final Field[] fields = structHelper.fields;
+        final int fieldEnd;
+        if (nextFieldIndex != fields.length - 1) {
+
+          // Parse until field separator (currentLevel).
+          fieldEnd = parseComplexField(fieldPosition, complexFieldEnd, currentLevel);
+
+          structHelper.fieldPosition = fieldEnd + 1;  // Move past key separator.
+
+          currentFieldLength = fieldEnd - fieldPosition;
+
+          return doReadField(fields[structHelper.nextFieldIndex++]);
+        } else {
+
+          if (!isEscaped) {
+
+            // No parsing necessary -- the end is the parent's end.
+            structHelper.fieldPosition = complexFieldEnd + 1;  // Move past parent field separator.
+            currentEscapeCount = 0;
+          } else {
+            // We must parse to get the escape count.
+            fieldEnd = parseComplexField(fieldPosition, complexFieldEnd, currentLevel - 1);
+          }
+
+          currentFieldLength = complexFieldEnd - fieldPosition;
+
+          structHelper.nextFieldIndex = 0;
+          return doReadField(fields[fields.length - 1]);
+        }
+      }
+    case UNION:
+      {
+        final UnionComplexTypeHelper unionHelper = (UnionComplexTypeHelper) complexTypeHelper;
+        final int fieldPosition = unionHelper.fieldPosition;
+        final int complexFieldEnd = unionHelper.complexFieldEnd;
+        Preconditions.checkState(fieldPosition <= complexFieldEnd);
+
+        currentFieldStart = fieldPosition;
+
+        final int fieldEnd;
+        if (!unionHelper.fieldHaveParsedTag) {
+          boolean isParentMap = isParentMap();
+          if (isParentMap) {
+            currentLevel++;
+          }
+
+          // Parse until union separator (currentLevel).
+          fieldEnd = parseComplexField(fieldPosition, complexFieldEnd, currentLevel);
+
+          unionHelper.fieldPosition = fieldEnd + 1;  // Move past union separator.
+
+          currentFieldLength = fieldEnd - fieldPosition;
+
+          unionHelper.fieldHaveParsedTag = true;
+          boolean successful = doReadField(unionHelper.tagField);
+          if (!successful) {
+            throw new IOException("Null union tag");
+          }
+          unionHelper.fieldTag = currentInt;
+
+          if (isParentMap) {
+            currentLevel--;
+          }
+          return true;
+        } else {
+
+          if (!isEscaped) {
+
+            // No parsing necessary -- the end is the parent's end.
+            unionHelper.fieldPosition = complexFieldEnd + 1;  // Move past parent field separator.
+            currentEscapeCount = 0;
+          } else {
+            // We must parse to get the escape count.
+            fieldEnd = parseComplexField(fieldPosition, complexFieldEnd, currentLevel - 1);
+          }
+
+          currentFieldLength = complexFieldEnd - fieldPosition;
+
+          unionHelper.fieldHaveParsedTag = false;
+          return doReadField(unionHelper.fields[unionHelper.fieldTag]);
+        }
+      }
+    default:
+      throw new Error("Unexpected complex category " + complexField.complexCategory);
+    }
+  }
+
+  private boolean isParentMap() {
+    return currentLevel >= 2 &&
+        currentComplexTypeHelpers[currentLevel - 2] instanceof MapComplexTypeHelper;
+  }
+
+  @Override
+  public void finishComplexVariableFieldsType() {
+    Preconditions.checkState(currentLevel > 0);
+
+    final ComplexTypeHelper complexTypeHelper = currentComplexTypeHelpers[currentLevel - 1];
+    final Field complexField = complexTypeHelper.complexField;
+    switch (complexField.complexCategory) {
+    case LIST:
+    case MAP:
+      throw new Error("Complex category " + complexField.complexCategory + " is not variable fields type");
+    case STRUCT:
+    case UNION:
+      popComplexType();
+      break;
+    default:
+      throw new Error("Unexpected category " + complexField.complexCategory);
+    }
+  }
+
   /*
    * Call this method may be called after all the all fields have been read to check
    * for unread fields.
@@ -632,21 +1173,34 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
   }
 
   public void logExceptionMessage(byte[] bytes, int bytesStart, int bytesLength,
-      PrimitiveCategory dataCategory) {
+      Category dataComplexCategory, PrimitiveCategory dataPrimitiveCategory) {
     final String dataType;
-    switch (dataCategory) {
-    case BYTE:
-      dataType = "TINYINT";
-      break;
-    case LONG:
-      dataType = "BIGINT";
-      break;
-    case SHORT:
-      dataType = "SMALLINT";
-      break;
-    default:
-      dataType = dataCategory.toString();
-      break;
+    if (dataComplexCategory == null) {
+      switch (dataPrimitiveCategory) {
+      case BYTE:
+        dataType = "TINYINT";
+        break;
+      case LONG:
+        dataType = "BIGINT";
+        break;
+      case SHORT:
+        dataType = "SMALLINT";
+        break;
+      default:
+        dataType = dataPrimitiveCategory.toString();
+        break;
+      }
+    } else {
+      switch (dataComplexCategory) {
+      case LIST:
+      case MAP:
+      case STRUCT:
+      case UNION:
+        dataType = dataComplexCategory.toString();
+        break;
+      default:
+        throw new Error("Unexpected complex category " + dataComplexCategory);
+      }
     }
     logExceptionMessage(bytes, bytesStart, bytesLength, dataType);
   }