You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/24 23:51:51 UTC

[17/54] [abbrv] hive git commit: HIVE-16207: Add support for Complex Types in Fast SerDe (Teddy Choi, reviewed by Matt McCline)

http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinFastRowHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinFastRowHashMap.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinFastRowHashMap.java
index ebb243e..82d9e29 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinFastRowHashMap.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinFastRowHashMap.java
@@ -34,13 +34,12 @@ import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerialize
 import org.apache.hadoop.hive.serde2.fast.SerializeWrite;
 import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Writable;
 import org.junit.Test;
 
 /*
@@ -83,8 +82,8 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
             keyColumnNullMarker, keyColumnNotNullMarker);
 
 
-    PrimitiveTypeInfo[] valuePrimitiveTypeInfos = valueSource.primitiveTypeInfos();
-    final int columnCount = valuePrimitiveTypeInfos.length;
+    TypeInfo[] valueTypeInfos = valueSource.typeInfos();
+    final int columnCount = valueTypeInfos.length;
 
     SerializeWrite valueSerializeWrite = new LazyBinarySerializeWrite(columnCount);
 
@@ -97,10 +96,7 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
         ((LazyBinarySerializeWrite) valueSerializeWrite).set(valueOutput);
 
       for (int index = 0; index < columnCount; index++) {
-
-        Writable writable = (Writable) valueRow[index];
-
-        VerifyFastRow.serializeWrite(valueSerializeWrite, valuePrimitiveTypeInfos[index], writable);
+        VerifyFastRow.serializeWrite(valueSerializeWrite, valueTypeInfos[index], valueRow[index]);
       }
 
       byte[] value = Arrays.copyOf(valueOutput.getData(), valueOutput.getLength());
@@ -109,17 +105,13 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
       byte[] key;
       if (random.nextBoolean() || verifyTable.getCount() == 0) {
         Object[] keyRow =
-            VectorRandomRowSource.randomRow(keyCount, random, keyPrimitiveObjectInspectorList,
-                keyPrimitiveCategories, keyPrimitiveTypeInfos);
+            VectorRandomRowSource.randomWritablePrimitiveRow(keyCount, random, keyPrimitiveTypeInfos);
 
         Output keyOutput = new Output();
         keySerializeWrite.set(keyOutput);
 
         for (int index = 0; index < keyCount; index++) {
-
-          Writable writable = (Writable) keyRow[index];
-
-          VerifyFastRow.serializeWrite(keySerializeWrite, keyPrimitiveTypeInfos[index], writable);
+          VerifyFastRow.serializeWrite(keySerializeWrite, keyPrimitiveTypeInfos[index], keyRow[index]);
         }
 
         key = Arrays.copyOf(keyOutput.getData(), keyOutput.getLength());
@@ -135,7 +127,7 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
       map.putRow(keyWritable, valueWritable);
       // verifyTable.verify(map);
     }
-    verifyTable.verify(map, hashTableKeyType, valuePrimitiveTypeInfos,
+    verifyTable.verify(map, hashTableKeyType, valueTypeInfos,
         doClipping, useExactBytes, random);
   }
 
@@ -152,9 +144,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
+    
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
 
-    int rowCount = 10000;
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -176,9 +169,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -200,9 +194,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
+    
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
 
-    int rowCount = 10000;
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -224,9 +219,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -248,9 +244,11 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -272,9 +270,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -296,9 +295,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -320,9 +320,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -344,9 +345,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -368,9 +370,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -392,9 +395,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -416,9 +420,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -441,9 +446,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -465,9 +471,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -489,9 +496,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -513,9 +521,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -537,9 +546,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -561,9 +571,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -585,9 +596,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -609,9 +621,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -633,9 +646,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -657,9 +671,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -681,9 +696,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,
@@ -705,9 +721,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
     VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
 
     VectorRandomRowSource valueSource = new VectorRandomRowSource();
-    valueSource.init(random);
 
-    int rowCount = 10000;
+    valueSource.init(random, VectorRandomRowSource.SupportedTypes.ALL, 4, false);
+
+    int rowCount = 1000;
     Object[][] rows = valueSource.randomRows(rowCount);
 
     addAndVerifyRows(valueSource, rows,

http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VerifyFastRow.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VerifyFastRow.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VerifyFastRow.java
index 91b3ead..137df12 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VerifyFastRow.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VerifyFastRow.java
@@ -18,9 +18,14 @@
 package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import junit.framework.TestCase;
 
@@ -41,9 +46,16 @@ import org.apache.hadoop.hive.serde2.io.HiveIntervalYearMonthWritable;
 import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;
@@ -60,341 +72,635 @@ import org.apache.hadoop.io.Writable;
 public class VerifyFastRow {
 
   public static void verifyDeserializeRead(DeserializeRead deserializeRead,
-      PrimitiveTypeInfo primitiveTypeInfo, Writable writable) throws IOException {
+      TypeInfo typeInfo, Object object) throws IOException {
 
     boolean isNull;
 
     isNull = !deserializeRead.readNextField();
+    doVerifyDeserializeRead(deserializeRead, typeInfo, object, isNull);
+  }
+
+  public static void doVerifyDeserializeRead(DeserializeRead deserializeRead,
+      TypeInfo typeInfo, Object object, boolean isNull) throws IOException {
     if (isNull) {
-      if (writable != null) {
-        TestCase.fail(
-            deserializeRead.getClass().getName() +
-            " field reports null but object is not null " +
-            "(class " + writable.getClass().getName() + ", " + writable.toString() + ")");
+      if (object != null) {
+        TestCase.fail("Field reports null but object is not null (class " + object.getClass().getName() + ", " + object.toString() + ")");
       }
       return;
-    } else if (writable == null) {
+    } else if (object == null) {
       TestCase.fail("Field report not null but object is null");
     }
-    switch (primitiveTypeInfo.getPrimitiveCategory()) {
-      case BOOLEAN:
-      {
-        boolean value = deserializeRead.currentBoolean;
-        if (!(writable instanceof BooleanWritable)) {
-          TestCase.fail("Boolean expected writable not Boolean");
-        }
-        boolean expected = ((BooleanWritable) writable).get();
-        if (value != expected) {
-          TestCase.fail("Boolean field mismatch (expected " + expected + " found " + value + ")");
-        }
-      }
-      break;
-    case BYTE:
-      {
-        byte value = deserializeRead.currentByte;
-        if (!(writable instanceof ByteWritable)) {
-          TestCase.fail("Byte expected writable not Byte");
-        }
-        byte expected = ((ByteWritable) writable).get();
-        if (value != expected) {
-          TestCase.fail("Byte field mismatch (expected " + (int) expected + " found " + (int) value + ")");
-        }
-      }
-      break;
-    case SHORT:
-      {
-        short value = deserializeRead.currentShort;
-        if (!(writable instanceof ShortWritable)) {
-          TestCase.fail("Short expected writable not Short");
-        }
-        short expected = ((ShortWritable) writable).get();
-        if (value != expected) {
-          TestCase.fail("Short field mismatch (expected " + expected + " found " + value + ")");
-        }
-      }
-      break;
-    case INT:
-      {
-        int value = deserializeRead.currentInt;
-        if (!(writable instanceof IntWritable)) {
-          TestCase.fail("Integer expected writable not Integer");
-        }
-        int expected = ((IntWritable) writable).get();
-        if (value != expected) {
-          TestCase.fail("Int field mismatch (expected " + expected + " found " + value + ")");
-        }
-      }
-      break;
-    case LONG:
-      {
-        long value = deserializeRead.currentLong;
-        if (!(writable instanceof LongWritable)) {
-          TestCase.fail("Long expected writable not Long");
-        }
-        Long expected = ((LongWritable) writable).get();
-        if (value != expected) {
-          TestCase.fail("Long field mismatch (expected " + expected + " found " + value + ")");
-        }
-      }
-      break;
-    case FLOAT:
-      {
-        float value = deserializeRead.currentFloat;
-        if (!(writable instanceof FloatWritable)) {
-          TestCase.fail("Float expected writable not Float");
-        }
-        float expected = ((FloatWritable) writable).get();
-        if (value != expected) {
-          TestCase.fail("Float field mismatch (expected " + expected + " found " + value + ")");
-        }
-      }
-      break;
-    case DOUBLE:
-      {
-        double value = deserializeRead.currentDouble;
-        if (!(writable instanceof DoubleWritable)) {
-          TestCase.fail("Double expected writable not Double");
-        }
-        double expected = ((DoubleWritable) writable).get();
-        if (value != expected) {
-          TestCase.fail("Double field mismatch (expected " + expected + " found " + value + ")");
-        }
-      }
-      break;
-    case STRING:
-      {
-        byte[] stringBytes = Arrays.copyOfRange(
-            deserializeRead.currentBytes,
-            deserializeRead.currentBytesStart,
-            deserializeRead.currentBytesStart + deserializeRead.currentBytesLength);
-        Text text = new Text(stringBytes);
-        String string = text.toString();
-        String expected = ((Text) writable).toString();
-        if (!string.equals(expected)) {
-          TestCase.fail("String field mismatch (expected '" + expected + "' found '" + string + "')");
-        }
-      }
-      break;
-    case CHAR:
-      {
-        byte[] stringBytes = Arrays.copyOfRange(
-            deserializeRead.currentBytes,
-            deserializeRead.currentBytesStart,
-            deserializeRead.currentBytesStart + deserializeRead.currentBytesLength);
-        Text text = new Text(stringBytes);
-        String string = text.toString();
+    switch (typeInfo.getCategory()) {
+    case PRIMITIVE:
+      {
+        PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
+        switch (primitiveTypeInfo.getPrimitiveCategory()) {
+        case BOOLEAN:
+          {
+            boolean value = deserializeRead.currentBoolean;
+            if (!(object instanceof BooleanWritable)) {
+              TestCase.fail("Boolean expected writable not Boolean");
+            }
+            boolean expected = ((BooleanWritable) object).get();
+            if (value != expected) {
+              TestCase.fail("Boolean field mismatch (expected " + expected + " found " + value + ")");
+            }
+          }
+          break;
+        case BYTE:
+          {
+            byte value = deserializeRead.currentByte;
+            if (!(object instanceof ByteWritable)) {
+              TestCase.fail("Byte expected writable not Byte");
+            }
+            byte expected = ((ByteWritable) object).get();
+            if (value != expected) {
+              TestCase.fail("Byte field mismatch (expected " + (int) expected + " found " + (int) value + ")");
+            }
+          }
+          break;
+        case SHORT:
+          {
+            short value = deserializeRead.currentShort;
+            if (!(object instanceof ShortWritable)) {
+              TestCase.fail("Short expected writable not Short");
+            }
+            short expected = ((ShortWritable) object).get();
+            if (value != expected) {
+              TestCase.fail("Short field mismatch (expected " + expected + " found " + value + ")");
+            }
+          }
+          break;
+        case INT:
+          {
+            int value = deserializeRead.currentInt;
+            if (!(object instanceof IntWritable)) {
+              TestCase.fail("Integer expected writable not Integer");
+            }
+            int expected = ((IntWritable) object).get();
+            if (value != expected) {
+              TestCase.fail("Int field mismatch (expected " + expected + " found " + value + ")");
+            }
+          }
+          break;
+        case LONG:
+          {
+            long value = deserializeRead.currentLong;
+            if (!(object instanceof LongWritable)) {
+              TestCase.fail("Long expected writable not Long");
+            }
+            Long expected = ((LongWritable) object).get();
+            if (value != expected) {
+              TestCase.fail("Long field mismatch (expected " + expected + " found " + value + ")");
+            }
+          }
+          break;
+        case FLOAT:
+          {
+            float value = deserializeRead.currentFloat;
+            if (!(object instanceof FloatWritable)) {
+              TestCase.fail("Float expected writable not Float");
+            }
+            float expected = ((FloatWritable) object).get();
+            if (value != expected) {
+              TestCase.fail("Float field mismatch (expected " + expected + " found " + value + ")");
+            }
+          }
+          break;
+        case DOUBLE:
+          {
+            double value = deserializeRead.currentDouble;
+            if (!(object instanceof DoubleWritable)) {
+              TestCase.fail("Double expected writable not Double");
+            }
+            double expected = ((DoubleWritable) object).get();
+            if (value != expected) {
+              TestCase.fail("Double field mismatch (expected " + expected + " found " + value + ")");
+            }
+          }
+          break;
+        case STRING:
+          {
+            byte[] stringBytes = Arrays.copyOfRange(
+                deserializeRead.currentBytes,
+                deserializeRead.currentBytesStart,
+                deserializeRead.currentBytesStart + deserializeRead.currentBytesLength);
+            Text text = new Text(stringBytes);
+            String string = text.toString();
+            String expected = ((Text) object).toString();
+            if (!string.equals(expected)) {
+              TestCase.fail("String field mismatch (expected '" + expected + "' found '" + string + "')");
+            }
+          }
+          break;
+        case CHAR:
+          {
+            byte[] stringBytes = Arrays.copyOfRange(
+                deserializeRead.currentBytes,
+                deserializeRead.currentBytesStart,
+                deserializeRead.currentBytesStart + deserializeRead.currentBytesLength);
+            Text text = new Text(stringBytes);
+            String string = text.toString();
 
-        HiveChar hiveChar = new HiveChar(string, ((CharTypeInfo) primitiveTypeInfo).getLength());
+            HiveChar hiveChar = new HiveChar(string, ((CharTypeInfo) primitiveTypeInfo).getLength());
 
-        HiveChar expected = ((HiveCharWritable) writable).getHiveChar();
-        if (!hiveChar.equals(expected)) {
-          TestCase.fail("Char field mismatch (expected '" + expected + "' found '" + hiveChar + "')");
-        }
-      }
-      break;
-    case VARCHAR:
-      {
-        byte[] stringBytes = Arrays.copyOfRange(
-            deserializeRead.currentBytes,
-            deserializeRead.currentBytesStart,
-            deserializeRead.currentBytesStart + deserializeRead.currentBytesLength);
-        Text text = new Text(stringBytes);
-        String string = text.toString();
+            HiveChar expected = ((HiveCharWritable) object).getHiveChar();
+            if (!hiveChar.equals(expected)) {
+              TestCase.fail("Char field mismatch (expected '" + expected + "' found '" + hiveChar + "')");
+            }
+          }
+          break;
+        case VARCHAR:
+          {
+            byte[] stringBytes = Arrays.copyOfRange(
+                deserializeRead.currentBytes,
+                deserializeRead.currentBytesStart,
+                deserializeRead.currentBytesStart + deserializeRead.currentBytesLength);
+            Text text = new Text(stringBytes);
+            String string = text.toString();
 
-        HiveVarchar hiveVarchar = new HiveVarchar(string, ((VarcharTypeInfo) primitiveTypeInfo).getLength());
+            HiveVarchar hiveVarchar = new HiveVarchar(string, ((VarcharTypeInfo) primitiveTypeInfo).getLength());
 
-        HiveVarchar expected = ((HiveVarcharWritable) writable).getHiveVarchar();
-        if (!hiveVarchar.equals(expected)) {
-          TestCase.fail("Varchar field mismatch (expected '" + expected + "' found '" + hiveVarchar + "')");
-        }
-      }
-      break;
-    case DECIMAL:
-      {
-        HiveDecimal value = deserializeRead.currentHiveDecimalWritable.getHiveDecimal();
-        if (value == null) {
-          TestCase.fail("Decimal field evaluated to NULL");
-        }
-        HiveDecimal expected = ((HiveDecimalWritable) writable).getHiveDecimal();
-        if (!value.equals(expected)) {
-          DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) primitiveTypeInfo;
-          int precision = decimalTypeInfo.getPrecision();
-          int scale = decimalTypeInfo.getScale();
-          TestCase.fail("Decimal field mismatch (expected " + expected.toString() + " found " + value.toString() + ") precision " + precision + ", scale " + scale);
+            HiveVarchar expected = ((HiveVarcharWritable) object).getHiveVarchar();
+            if (!hiveVarchar.equals(expected)) {
+              TestCase.fail("Varchar field mismatch (expected '" + expected + "' found '" + hiveVarchar + "')");
+            }
+          }
+          break;
+        case DECIMAL:
+          {
+            HiveDecimal value = deserializeRead.currentHiveDecimalWritable.getHiveDecimal();
+            if (value == null) {
+              TestCase.fail("Decimal field evaluated to NULL");
+            }
+            HiveDecimal expected = ((HiveDecimalWritable) object).getHiveDecimal();
+            if (!value.equals(expected)) {
+              DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) primitiveTypeInfo;
+              int precision = decimalTypeInfo.getPrecision();
+              int scale = decimalTypeInfo.getScale();
+              TestCase.fail("Decimal field mismatch (expected " + expected.toString() + " found " + value.toString() + ") precision " + precision + ", scale " + scale);
+            }
+          }
+          break;
+        case DATE:
+          {
+            Date value = deserializeRead.currentDateWritable.get();
+            Date expected = ((DateWritable) object).get();
+            if (!value.equals(expected)) {
+              TestCase.fail("Date field mismatch (expected " + expected.toString() + " found " + value.toString() + ")");
+            }
+          }
+          break;
+        case TIMESTAMP:
+          {
+            Timestamp value = deserializeRead.currentTimestampWritable.getTimestamp();
+            Timestamp expected = ((TimestampWritable) object).getTimestamp();
+            if (!value.equals(expected)) {
+              TestCase.fail("Timestamp field mismatch (expected " + expected.toString() + " found " + value.toString() + ")");
+            }
+          }
+          break;
+        case INTERVAL_YEAR_MONTH:
+          {
+            HiveIntervalYearMonth value = deserializeRead.currentHiveIntervalYearMonthWritable.getHiveIntervalYearMonth();
+            HiveIntervalYearMonth expected = ((HiveIntervalYearMonthWritable) object).getHiveIntervalYearMonth();
+            if (!value.equals(expected)) {
+              TestCase.fail("HiveIntervalYearMonth field mismatch (expected " + expected.toString() + " found " + value.toString() + ")");
+            }
+          }
+          break;
+        case INTERVAL_DAY_TIME:
+          {
+            HiveIntervalDayTime value = deserializeRead.currentHiveIntervalDayTimeWritable.getHiveIntervalDayTime();
+            HiveIntervalDayTime expected = ((HiveIntervalDayTimeWritable) object).getHiveIntervalDayTime();
+            if (!value.equals(expected)) {
+              TestCase.fail("HiveIntervalDayTime field mismatch (expected " + expected.toString() + " found " + value.toString() + ")");
+            }
+          }
+          break;
+        case BINARY:
+          {
+            byte[] byteArray = Arrays.copyOfRange(
+                deserializeRead.currentBytes,
+                deserializeRead.currentBytesStart,
+                deserializeRead.currentBytesStart + deserializeRead.currentBytesLength);
+            BytesWritable bytesWritable = (BytesWritable) object;
+            byte[] expected = Arrays.copyOfRange(bytesWritable.getBytes(), 0, bytesWritable.getLength());
+            if (byteArray.length != expected.length){
+              TestCase.fail("Byte Array field mismatch (expected " + Arrays.toString(expected)
+                  + " found " + Arrays.toString(byteArray) + ")");
+            }
+            for (int b = 0; b < byteArray.length; b++) {
+              if (byteArray[b] != expected[b]) {
+                TestCase.fail("Byte Array field mismatch (expected " + Arrays.toString(expected)
+                    + " found " + Arrays.toString(byteArray) + ")");
+              }
+            }
+          }
+          break;
+        default:
+          throw new Error("Unknown primitive category " + primitiveTypeInfo.getPrimitiveCategory());
         }
       }
       break;
-    case DATE:
-      {
-        Date value = deserializeRead.currentDateWritable.get();
-        Date expected = ((DateWritable) writable).get();
-        if (!value.equals(expected)) {
-          TestCase.fail("Date field mismatch (expected " + expected.toString() + " found " + value.toString() + ")");
+    case LIST:
+    case MAP:
+    case STRUCT:
+    case UNION:
+      throw new Error("Complex types need to be handled separately");
+    default:
+      throw new Error("Unknown category " + typeInfo.getCategory());
+    }
+  }
+
+  public static void serializeWrite(SerializeWrite serializeWrite,
+      TypeInfo typeInfo, Object object) throws IOException {
+    if (object == null) {
+      serializeWrite.writeNull();
+      return;
+    }
+    switch (typeInfo.getCategory()) {
+    case PRIMITIVE:
+      {
+        PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
+        switch (primitiveTypeInfo.getPrimitiveCategory()) {
+        case BOOLEAN:
+          {
+            boolean value = ((BooleanWritable) object).get();
+            serializeWrite.writeBoolean(value);
+          }
+          break;
+        case BYTE:
+          {
+            byte value = ((ByteWritable) object).get();
+            serializeWrite.writeByte(value);
+          }
+          break;
+        case SHORT:
+          {
+            short value = ((ShortWritable) object).get();
+            serializeWrite.writeShort(value);
+          }
+          break;
+        case INT:
+          {
+            int value = ((IntWritable) object).get();
+            serializeWrite.writeInt(value);
+          }
+          break;
+        case LONG:
+          {
+            long value = ((LongWritable) object).get();
+            serializeWrite.writeLong(value);
+          }
+          break;
+        case FLOAT:
+          {
+            float value = ((FloatWritable) object).get();
+            serializeWrite.writeFloat(value);
+          }
+          break;
+        case DOUBLE:
+          {
+            double value = ((DoubleWritable) object).get();
+            serializeWrite.writeDouble(value);
+          }
+          break;
+        case STRING:
+          {
+            Text value = (Text) object;
+            byte[] stringBytes = value.getBytes();
+            int stringLength = stringBytes.length;
+            serializeWrite.writeString(stringBytes, 0, stringLength);
+          }
+          break;
+        case CHAR:
+          {
+            HiveChar value = ((HiveCharWritable) object).getHiveChar();
+            serializeWrite.writeHiveChar(value);
+          }
+          break;
+        case VARCHAR:
+          {
+            HiveVarchar value = ((HiveVarcharWritable) object).getHiveVarchar();
+            serializeWrite.writeHiveVarchar(value);
+          }
+          break;
+        case DECIMAL:
+          {
+            HiveDecimal value = ((HiveDecimalWritable) object).getHiveDecimal();
+            DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)primitiveTypeInfo;
+            serializeWrite.writeHiveDecimal(value, decTypeInfo.scale());
+          }
+          break;
+        case DATE:
+          {
+            Date value = ((DateWritable) object).get();
+            serializeWrite.writeDate(value);
+          }
+          break;
+        case TIMESTAMP:
+          {
+            Timestamp value = ((TimestampWritable) object).getTimestamp();
+            serializeWrite.writeTimestamp(value);
+          }
+          break;
+        case INTERVAL_YEAR_MONTH:
+          {
+            HiveIntervalYearMonth value = ((HiveIntervalYearMonthWritable) object).getHiveIntervalYearMonth();
+            serializeWrite.writeHiveIntervalYearMonth(value);
+          }
+          break;
+        case INTERVAL_DAY_TIME:
+          {
+            HiveIntervalDayTime value = ((HiveIntervalDayTimeWritable) object).getHiveIntervalDayTime();
+            serializeWrite.writeHiveIntervalDayTime(value);
+          }
+          break;
+        case BINARY:
+          {
+            BytesWritable byteWritable = (BytesWritable) object;
+            byte[] binaryBytes = byteWritable.getBytes();
+            int length = byteWritable.getLength();
+            serializeWrite.writeBinary(binaryBytes, 0, length);
+          }
+          break;
+        default:
+          throw new Error("Unknown primitive category " + primitiveTypeInfo.getPrimitiveCategory().name());
         }
       }
       break;
-    case TIMESTAMP:
+    case LIST:
       {
-        Timestamp value = deserializeRead.currentTimestampWritable.getTimestamp();
-        Timestamp expected = ((TimestampWritable) writable).getTimestamp();
-        if (!value.equals(expected)) {
-          TestCase.fail("Timestamp field mismatch (expected " + expected.toString() + " found " + value.toString() + ")");
+        ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
+        TypeInfo elementTypeInfo = listTypeInfo.getListElementTypeInfo();
+        ArrayList<Object> elements = (ArrayList<Object>) object;
+        serializeWrite.beginList(elements);
+        boolean isFirst = true;
+        for (Object elementObject : elements) {
+          if (isFirst) {
+            isFirst = false;
+          } else {
+            serializeWrite.separateList();
+          }
+          if (elementObject == null) {
+            serializeWrite.writeNull();
+          } else {
+            serializeWrite(serializeWrite, elementTypeInfo, elementObject);
+          }
         }
-      }
-      break;
-    case INTERVAL_YEAR_MONTH:
-      {
-        HiveIntervalYearMonth value = deserializeRead.currentHiveIntervalYearMonthWritable.getHiveIntervalYearMonth();
-        HiveIntervalYearMonth expected = ((HiveIntervalYearMonthWritable) writable).getHiveIntervalYearMonth();
-        if (!value.equals(expected)) {
-          TestCase.fail("HiveIntervalYearMonth field mismatch (expected " + expected.toString() + " found " + value.toString() + ")");
+        serializeWrite.finishList();
+      }
+      break;
+    case MAP:
+      {
+        MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
+        TypeInfo keyTypeInfo = mapTypeInfo.getMapKeyTypeInfo();
+        TypeInfo valueTypeInfo = mapTypeInfo.getMapValueTypeInfo();
+        HashMap<Object, Object> hashMap = (HashMap<Object, Object>) object;
+        serializeWrite.beginMap(hashMap);
+        boolean isFirst = true;
+        for (Map.Entry<Object, Object> entry : hashMap.entrySet()) {
+          if (isFirst) {
+            isFirst = false;
+          } else {
+            serializeWrite.separateKeyValuePair();
+          }
+          if (entry.getKey() == null) {
+            serializeWrite.writeNull();
+          } else {
+            serializeWrite(serializeWrite, keyTypeInfo, entry.getKey());
+          }
+          serializeWrite.separateKey();
+          if (entry.getValue() == null) {
+            serializeWrite.writeNull();
+          } else {
+            serializeWrite(serializeWrite, valueTypeInfo, entry.getValue());
+          }
         }
-      }
-      break;
-    case INTERVAL_DAY_TIME:
-      {
-        HiveIntervalDayTime value = deserializeRead.currentHiveIntervalDayTimeWritable.getHiveIntervalDayTime();
-        HiveIntervalDayTime expected = ((HiveIntervalDayTimeWritable) writable).getHiveIntervalDayTime();
-        if (!value.equals(expected)) {
-          TestCase.fail("HiveIntervalDayTime field mismatch (expected " + expected.toString() + " found " + value.toString() + ")");
+        serializeWrite.finishMap();
+      }
+      break;
+    case STRUCT:
+      {
+        StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+        ArrayList<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+        ArrayList<Object> fieldValues = (ArrayList<Object>) object;
+        final int size = fieldValues.size();
+        serializeWrite.beginStruct(fieldValues);
+        boolean isFirst = true;
+        for (int i = 0; i < size; i++) {
+          if (isFirst) {
+            isFirst = false;
+          } else {
+            serializeWrite.separateStruct();
+          }
+          serializeWrite(serializeWrite, fieldTypeInfos.get(i), fieldValues.get(i));
         }
+        serializeWrite.finishStruct();
       }
       break;
-    case BINARY:
+    case UNION:
       {
-        byte[] byteArray = Arrays.copyOfRange(
-            deserializeRead.currentBytes,
-            deserializeRead.currentBytesStart,
-            deserializeRead.currentBytesStart + deserializeRead.currentBytesLength);
-        BytesWritable bytesWritable = (BytesWritable) writable;
-        byte[] expected = Arrays.copyOfRange(bytesWritable.getBytes(), 0, bytesWritable.getLength());
-        if (byteArray.length != expected.length){
-          TestCase.fail("Byte Array field mismatch (expected " + Arrays.toString(expected)
-              + " found " + Arrays.toString(byteArray) + ")");
-        }
-        for (int b = 0; b < byteArray.length; b++) {
-          if (byteArray[b] != expected[b]) {
-            TestCase.fail("Byte Array field mismatch (expected " + Arrays.toString(expected)
-              + " found " + Arrays.toString(byteArray) + ")");
-          }
-        }
+        UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
+        List<TypeInfo> fieldTypeInfos = unionTypeInfo.getAllUnionObjectTypeInfos();
+        final int size = fieldTypeInfos.size();
+        StandardUnionObjectInspector.StandardUnion standardUnion = (StandardUnionObjectInspector.StandardUnion) object;
+        byte tag = standardUnion.getTag();
+        serializeWrite.beginUnion(tag);
+        serializeWrite(serializeWrite, fieldTypeInfos.get(tag), standardUnion.getObject());
+        serializeWrite.finishUnion();
       }
       break;
     default:
-      throw new Error("Unknown primitive category " + primitiveTypeInfo.getPrimitiveCategory());
+      throw new Error("Unknown category " + typeInfo.getCategory().name());
     }
   }
 
-  public static void serializeWrite(SerializeWrite serializeWrite,
-      PrimitiveTypeInfo primitiveTypeInfo, Writable writable) throws IOException {
-    if (writable == null) {
-      serializeWrite.writeNull();
-      return;
+  public Object readComplexPrimitiveField(DeserializeRead deserializeRead,
+                                          PrimitiveTypeInfo primitiveTypeInfo) throws IOException {
+    boolean isNull = !deserializeRead.readComplexField();
+    if (isNull) {
+      return null;
+    } else {
+      return doReadComplexPrimitiveField(deserializeRead, primitiveTypeInfo);
     }
+  }
+
+  private static Object doReadComplexPrimitiveField(DeserializeRead deserializeRead,
+                                                    PrimitiveTypeInfo primitiveTypeInfo) throws IOException {
     switch (primitiveTypeInfo.getPrimitiveCategory()) {
-      case BOOLEAN:
-      {
-        boolean value = ((BooleanWritable) writable).get();
-        serializeWrite.writeBoolean(value);
-      }
-      break;
+    case BOOLEAN:
+      return new BooleanWritable(deserializeRead.currentBoolean);
     case BYTE:
-      {
-        byte value = ((ByteWritable) writable).get();
-        serializeWrite.writeByte(value);
-      }
-      break;
+      return new ByteWritable(deserializeRead.currentByte);
     case SHORT:
-      {
-        short value = ((ShortWritable) writable).get();
-        serializeWrite.writeShort(value);
-      }
-      break;
+      return new ShortWritable(deserializeRead.currentShort);
     case INT:
-      {
-        int value = ((IntWritable) writable).get();
-        serializeWrite.writeInt(value);
-      }
-      break;
+      return new IntWritable(deserializeRead.currentInt);
     case LONG:
-      {
-        long value = ((LongWritable) writable).get();
-        serializeWrite.writeLong(value);
-      }
-      break;
+      return new LongWritable(deserializeRead.currentLong);
     case FLOAT:
-      {
-        float value = ((FloatWritable) writable).get();
-        serializeWrite.writeFloat(value);
-      }
-      break;
+      return new FloatWritable(deserializeRead.currentFloat);
     case DOUBLE:
-      {
-        double value = ((DoubleWritable) writable).get();
-        serializeWrite.writeDouble(value);
-      }
-      break;
+      return new DoubleWritable(deserializeRead.currentDouble);
     case STRING:
-      {
-        Text value = (Text) writable;
-        byte[] stringBytes = value.getBytes();
-        int stringLength = stringBytes.length;
-        serializeWrite.writeString(stringBytes, 0, stringLength);
-      }
-      break;
+      return new Text(new String(
+          deserializeRead.currentBytes,
+          deserializeRead.currentBytesStart,
+          deserializeRead.currentBytesLength,
+          StandardCharsets.UTF_8));
     case CHAR:
-      {
-        HiveChar value = ((HiveCharWritable) writable).getHiveChar();
-        serializeWrite.writeHiveChar(value);
-      }
-      break;
+      return new HiveCharWritable(new HiveChar(
+          new String(
+              deserializeRead.currentBytes,
+              deserializeRead.currentBytesStart,
+              deserializeRead.currentBytesLength,
+              StandardCharsets.UTF_8),
+          ((CharTypeInfo) primitiveTypeInfo).getLength()));
     case VARCHAR:
-      {
-        HiveVarchar value = ((HiveVarcharWritable) writable).getHiveVarchar();
-        serializeWrite.writeHiveVarchar(value);
-      }
-      break;
+      if (deserializeRead.currentBytes == null) {
+        throw new RuntimeException();
+      }
+      return new HiveVarcharWritable(new HiveVarchar(
+          new String(
+              deserializeRead.currentBytes,
+              deserializeRead.currentBytesStart,
+              deserializeRead.currentBytesLength,
+              StandardCharsets.UTF_8),
+          ((VarcharTypeInfo) primitiveTypeInfo).getLength()));
     case DECIMAL:
-      {
-        HiveDecimal value = ((HiveDecimalWritable) writable).getHiveDecimal();
-        DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)primitiveTypeInfo;
-        serializeWrite.writeHiveDecimal(value, decTypeInfo.scale());
-      }
-      break;
+      return new HiveDecimalWritable(deserializeRead.currentHiveDecimalWritable);
     case DATE:
-      {
-        Date value = ((DateWritable) writable).get();
-        serializeWrite.writeDate(value);
-      }
-      break;
+      return new DateWritable(deserializeRead.currentDateWritable);
     case TIMESTAMP:
-      {
-        Timestamp value = ((TimestampWritable) writable).getTimestamp();
-        serializeWrite.writeTimestamp(value);
-      }
-      break;
+      return new TimestampWritable(deserializeRead.currentTimestampWritable);
     case INTERVAL_YEAR_MONTH:
-      {
-        HiveIntervalYearMonth value = ((HiveIntervalYearMonthWritable) writable).getHiveIntervalYearMonth();
-        serializeWrite.writeHiveIntervalYearMonth(value);
-      }
-      break;
+      return new HiveIntervalYearMonthWritable(deserializeRead.currentHiveIntervalYearMonthWritable);
     case INTERVAL_DAY_TIME:
-      {
-        HiveIntervalDayTime value = ((HiveIntervalDayTimeWritable) writable).getHiveIntervalDayTime();
-        serializeWrite.writeHiveIntervalDayTime(value);
-      }
-      break;
+      return new HiveIntervalDayTimeWritable(deserializeRead.currentHiveIntervalDayTimeWritable);
     case BINARY:
-      {
-        BytesWritable byteWritable = (BytesWritable) writable;
-        byte[] binaryBytes = byteWritable.getBytes();
-        int length = byteWritable.getLength();
-        serializeWrite.writeBinary(binaryBytes, 0, length);
+      return new BytesWritable(
+          Arrays.copyOfRange(
+              deserializeRead.currentBytes,
+              deserializeRead.currentBytesStart,
+              deserializeRead.currentBytesLength + deserializeRead.currentBytesStart));
+    default:
+      throw new Error("Unknown primitive category " + primitiveTypeInfo.getPrimitiveCategory());
+    }
+  }
+
+  public static Object deserializeReadComplexType(DeserializeRead deserializeRead,
+      TypeInfo typeInfo) throws IOException {
+
+    boolean isNull = !deserializeRead.readNextField();
+    if (isNull) {
+      return null;
+    }
+    return getComplexField(deserializeRead, typeInfo);
+  }
+
+  static int fake = 0;
+
+  private static Object getComplexField(DeserializeRead deserializeRead,
+                                        TypeInfo typeInfo) throws IOException {
+    switch (typeInfo.getCategory()) {
+    case PRIMITIVE:
+      return doReadComplexPrimitiveField(deserializeRead, (PrimitiveTypeInfo) typeInfo);
+    case LIST:
+      {
+        ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
+        TypeInfo elementTypeInfo = listTypeInfo.getListElementTypeInfo();
+        ArrayList<Object> list = new ArrayList<Object>();
+        Object eleObj;
+        boolean isNull;
+        while (deserializeRead.isNextComplexMultiValue()) {
+          isNull = !deserializeRead.readComplexField();
+          if (isNull) {
+            eleObj = null;
+          } else {
+            eleObj = getComplexField(deserializeRead, elementTypeInfo);
+          }
+          list.add(eleObj);
+        }
+        return list;
+      }
+    case MAP:
+      {
+        MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
+        TypeInfo keyTypeInfo = mapTypeInfo.getMapKeyTypeInfo();
+        TypeInfo valueTypeInfo = mapTypeInfo.getMapValueTypeInfo();
+        HashMap<Object, Object> hashMap = new HashMap<Object, Object>();
+        Object keyObj;
+        Object valueObj;
+        boolean isNull;
+        while (deserializeRead.isNextComplexMultiValue()) {
+          isNull = !deserializeRead.readComplexField();
+          if (isNull) {
+            keyObj = null;
+          } else {
+            keyObj = getComplexField(deserializeRead, keyTypeInfo);
+          }
+          isNull = !deserializeRead.readComplexField();
+          if (isNull) {
+            valueObj = null;
+          } else {
+            valueObj = getComplexField(deserializeRead, valueTypeInfo);
+          }
+          hashMap.put(keyObj, valueObj);
+        }
+        return hashMap;
+      }
+    case STRUCT:
+      {
+        StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+        ArrayList<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+        final int size = fieldTypeInfos.size();
+        ArrayList<Object> fieldValues = new ArrayList<Object>();
+        Object fieldObj;
+        boolean isNull;
+        for (int i = 0; i < size; i++) {
+          isNull = !deserializeRead.readComplexField();
+          if (isNull) {
+            fieldObj = null;
+          } else {
+            fieldObj = getComplexField(deserializeRead, fieldTypeInfos.get(i));
+          }
+          fieldValues.add(fieldObj);
+        }
+        deserializeRead.finishComplexVariableFieldsType();
+        return fieldValues;
+      }
+    case UNION:
+      {
+        UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
+        List<TypeInfo> unionTypeInfos = unionTypeInfo.getAllUnionObjectTypeInfos();
+        final int size = unionTypeInfos.size();
+        Object tagObj;
+        int tag;
+        Object unionObj;
+        boolean isNull = !deserializeRead.readComplexField();
+        if (isNull) {
+          unionObj = null;
+        } else {
+          // Get the tag value.
+          tagObj = getComplexField(deserializeRead, TypeInfoFactory.intTypeInfo);
+          tag = ((IntWritable) tagObj).get();
+
+          isNull = !deserializeRead.readComplexField();
+          if (isNull) {
+            unionObj = null;
+          } else {
+            // Get the union value.
+            unionObj = new StandardUnionObjectInspector.StandardUnion((byte) tag, getComplexField(deserializeRead, unionTypeInfos.get(tag)));
+          }
+        }
+
+        deserializeRead.finishComplexVariableFieldsType();
+        return unionObj;
       }
-      break;
     default:
-      throw new Error("Unknown primitive category " + primitiveTypeInfo.getPrimitiveCategory().name());
+      throw new Error("Unexpected category " + typeInfo.getCategory());
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java b/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java
index 19d4550..d9160d7 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java
@@ -19,13 +19,19 @@
 package org.apache.hadoop.hive.serde2.binarysortable.fast;
 
 import java.io.IOException;
-import java.math.BigInteger;
+import java.util.ArrayDeque;
 import java.util.Arrays;
-import java.nio.charset.StandardCharsets;
-
+import java.util.Deque;
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.common.type.FastHiveDecimal;
 import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
 import org.apache.hadoop.hive.serde2.binarysortable.InputByteBuffer;
 import org.apache.hadoop.hive.serde2.fast.DeserializeRead;
@@ -57,12 +63,6 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
   byte[] columnNullMarker;
   byte[] columnNotNullMarker;
 
-  // Which field we are on.  We start with -1 so readNextField can increment once and the read
-  // field data methods don't increment.
-  private int fieldIndex;
-
-  private int fieldCount;
-
   private int start;
   private int end;
   private int fieldStart;
@@ -78,19 +78,40 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
 
   private InputByteBuffer inputByteBuffer = new InputByteBuffer();
 
+  private Field root;
+  private Deque<Field> stack;
+
+  private class Field {
+    Field[] children;
+
+    Category category;
+    PrimitiveObjectInspector.PrimitiveCategory primitiveCategory;
+    TypeInfo typeInfo;
+
+    int index;
+    int count;
+    int start;
+    int tag;
+  }
+
   /*
    * Use this constructor when only ascending sort order is used.
    */
-  public BinarySortableDeserializeRead(PrimitiveTypeInfo[] primitiveTypeInfos,
-      boolean useExternalBuffer) {
-    this(primitiveTypeInfos, useExternalBuffer, null, null, null);
+  public BinarySortableDeserializeRead(TypeInfo[] typeInfos, boolean useExternalBuffer) {
+    this(typeInfos, useExternalBuffer, null, null, null);
   }
 
   public BinarySortableDeserializeRead(TypeInfo[] typeInfos, boolean useExternalBuffer,
           boolean[] columnSortOrderIsDesc, byte[] columnNullMarker, byte[] columnNotNullMarker) {
     super(typeInfos, useExternalBuffer);
     final int count = typeInfos.length;
-    fieldCount = count;
+
+    root = new Field();
+    root.category = Category.STRUCT;
+    root.children = createFields(typeInfos);
+    root.count = count;
+    stack = new ArrayDeque<>();
+
     if (columnSortOrderIsDesc != null) {
       this.columnSortOrderIsDesc = columnSortOrderIsDesc;
     } else {
@@ -131,10 +152,23 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
    */
   @Override
   public void set(byte[] bytes, int offset, int length) {
-    fieldIndex = -1;
     start = offset;
     end = offset + length;
     inputByteBuffer.reset(bytes, start, end);
+    root.index = -1;
+    stack.clear();
+    stack.push(root);
+    clearIndex(root);
+  }
+
+  private void clearIndex(Field field) {
+    field.index = -1;
+    if (field.children == null) {
+      return;
+    }
+    for (Field child : field.children) {
+      clearIndex(child);
+    }
   }
 
   /*
@@ -150,15 +184,15 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
     sb.append(" for length ");
     sb.append(end - start);
     sb.append(" to read ");
-    sb.append(fieldCount);
+    sb.append(root.count);
     sb.append(" fields with types ");
     sb.append(Arrays.toString(typeInfos));
     sb.append(".  ");
-    if (fieldIndex == -1) {
+    if (root.index == -1) {
       sb.append("Before first field?");
     } else {
       sb.append("Read field #");
-      sb.append(fieldIndex);
+      sb.append(root.index);
       sb.append(" at field start position ");
       sb.append(fieldStart);
       sb.append(" current read offset ");
@@ -187,31 +221,17 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
    */
   @Override
   public boolean readNextField() throws IOException {
+    return readComplexField();
+  }
 
-    // We start with fieldIndex as -1 so we can increment once here and then the read
-    // field data methods don't increment.
-    fieldIndex++;
-
-    if (fieldIndex >= fieldCount) {
-      return false;
-    }
-    if (inputByteBuffer.isEof()) {
-      // Also, reading beyond our byte range produces NULL.
-      return false;
-    }
-
-    fieldStart = inputByteBuffer.tell();
-
-    byte isNullByte = inputByteBuffer.read(columnSortOrderIsDesc[fieldIndex]);
-
-    if (isNullByte == columnNullMarker[fieldIndex]) {
-      return false;
-    }
+  private boolean readPrimitive(Field field) throws IOException {
+    final int fieldIndex = root.index;
+    field.start = inputByteBuffer.tell();
 
     /*
      * We have a field and are positioned to it.  Read it.
      */
-    switch (primitiveCategories[fieldIndex]) {
+    switch (field.primitiveCategory) {
     case BOOLEAN:
       currentBoolean = (inputByteBuffer.read(columnSortOrderIsDesc[fieldIndex]) == 2);
       return true;
@@ -392,7 +412,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
         if (!(b == 1 || b == -1 || b == 0)) {
           throw new IOException("Unexpected byte value " + (int)b + " in binary sortable format data (invert " + invert + ")");
         }
-        boolean positive = b != -1;
+        final boolean positive = b != -1;
 
         int factor = inputByteBuffer.read(invert) ^ 0x80;
         for (int i = 0; i < 3; i++) {
@@ -403,7 +423,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
           factor = -factor;
         }
 
-        int decimalStart = inputByteBuffer.tell();
+        final int decimalStart = inputByteBuffer.tell();
         int length = 0;
 
         do {
@@ -434,10 +454,8 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
         // read the null byte again
         inputByteBuffer.read(positive ? invert : !invert);
 
-        String digits = new String(tempDecimalBuffer, 0, length, StandardCharsets.UTF_8);
-
         // Set the value of the writable from the decimal digits that were written with no dot.
-        int scale = length - factor;
+        final int scale = length - factor;
         currentHiveDecimalWritable.setFromDigitsOnlyBytesWithScale(
             !positive, tempDecimalBuffer, 0, length, scale);
         boolean decimalIsNull = !currentHiveDecimalWritable.isSet();
@@ -445,10 +463,10 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
 
           // We have a decimal.  After we enforce precision and scale, will it become a NULL?
 
-          DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfos[fieldIndex];
+          final DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) field.typeInfo;
 
-          int enforcePrecision = decimalTypeInfo.getPrecision();
-          int enforceScale = decimalTypeInfo.getScale();
+          final int enforcePrecision = decimalTypeInfo.getPrecision();
+          final int enforceScale = decimalTypeInfo.getScale();
 
           decimalIsNull =
               !currentHiveDecimalWritable.mutateEnforcePrecisionScale(
@@ -461,7 +479,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
       }
       return true;
     default:
-      throw new RuntimeException("Unexpected primitive type category " + primitiveCategories[fieldIndex]);
+      throw new RuntimeException("Unexpected primitive type category " + field.primitiveCategory);
     }
   }
 
@@ -472,8 +490,53 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
    * Designed for skipping columns that are not included.
    */
   public void skipNextField() throws IOException {
-    // Not a known use case for BinarySortable -- so don't optimize.
-    readNextField();
+    final Field current = stack.peek();
+    current.index++;
+
+    if (root.index >= root.count) {
+      return;
+    }
+
+    if (inputByteBuffer.isEof()) {
+      // Also, reading beyond our byte range produces NULL.
+      return;
+    }
+
+    if (current.category == Category.UNION && current.index == 0) {
+      current.tag = inputByteBuffer.read();
+      currentInt = current.tag;
+      return;
+    }
+
+    final Field child = getChild(current);
+
+    if (isNull()) {
+      return;
+    }
+    if (child.category == Category.PRIMITIVE) {
+      readPrimitive(child);
+    } else {
+      stack.push(child);
+      switch (child.category) {
+      case LIST:
+      case MAP:
+        while (isNextComplexMultiValue()) {
+          skipNextField();
+        }
+        break;
+      case STRUCT:
+        for (int i = 0; i < child.count; i++) {
+          skipNextField();
+        }
+        finishComplexVariableFieldsType();
+        break;
+      case UNION:
+        readComplexField();
+        skipNextField();
+        finishComplexVariableFieldsType();
+        break;
+      }
+    }
   }
 
   @Override
@@ -482,7 +545,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
   }
 
   private void copyToBuffer(byte[] buffer, int bufferStart, int bufferLength) throws IOException {
-    final boolean invert = columnSortOrderIsDesc[fieldIndex];
+    final boolean invert = columnSortOrderIsDesc[root.index];
     inputByteBuffer.seek(bytesStart);
     // 3. Copy the data.
     for (int i = 0; i < bufferLength; i++) {
@@ -516,4 +579,140 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
   public boolean isEndOfInputReached() {
     return inputByteBuffer.isEof();
   }
+
+  private Field[] createFields(TypeInfo[] typeInfos) {
+    final Field[] children = new Field[typeInfos.length];
+    for (int i = 0; i < typeInfos.length; i++) {
+      children[i] = createField(typeInfos[i]);
+    }
+    return children;
+  }
+
+  private Field createField(TypeInfo typeInfo) {
+    final Field field = new Field();
+    final Category category = typeInfo.getCategory();
+    field.category = category;
+    field.typeInfo = typeInfo;
+
+    switch (category) {
+    case PRIMITIVE:
+      field.primitiveCategory = ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
+      break;
+    case LIST:
+      field.children = new Field[1];
+      field.children[0] = createField(((ListTypeInfo) typeInfo).getListElementTypeInfo());
+      break;
+    case MAP:
+      field.children = new Field[2];
+      field.children[0] = createField(((MapTypeInfo) typeInfo).getMapKeyTypeInfo());
+      field.children[1] = createField(((MapTypeInfo) typeInfo).getMapValueTypeInfo());
+      break;
+    case STRUCT:
+      StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+      List<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+      field.count = fieldTypeInfos.size();
+      field.children = createFields(fieldTypeInfos.toArray(new TypeInfo[fieldTypeInfos.size()]));
+      break;
+    case UNION:
+      UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
+      List<TypeInfo> objectTypeInfos = unionTypeInfo.getAllUnionObjectTypeInfos();
+      field.count = 2;
+      field.children = createFields(objectTypeInfos.toArray(new TypeInfo[objectTypeInfos.size()]));
+      break;
+    default:
+      throw new RuntimeException();
+    }
+    return field;
+  }
+
+  private Field getChild(Field field) {
+    switch (field.category) {
+    case LIST:
+      return field.children[0];
+    case MAP:
+      return field.children[field.index % 2];
+    case STRUCT:
+      return field.children[field.index];
+    case UNION:
+      return field.children[field.tag];
+    default:
+      throw new RuntimeException();
+    }
+  }
+
+  private boolean isNull() throws IOException {
+    return inputByteBuffer.read(columnSortOrderIsDesc[root.index]) ==
+        columnNullMarker[root.index];
+  }
+
+  @Override
+  public boolean readComplexField() throws IOException {
+    final Field current = stack.peek();
+    current.index++;
+
+    if (root.index >= root.count) {
+      return false;
+    }
+
+    if (inputByteBuffer.isEof()) {
+      // Also, reading beyond our byte range produces NULL.
+      return false;
+    }
+
+    if (current.category == Category.UNION) {
+      if (current.index == 0) {
+        current.tag = inputByteBuffer.read(columnSortOrderIsDesc[root.index]);
+        currentInt = current.tag;
+        return true;
+      }
+    }
+
+    final Field child = getChild(current);
+
+    boolean isNull = isNull();
+
+    if (isNull) {
+      return false;
+    }
+    if (child.category == Category.PRIMITIVE) {
+      isNull = !readPrimitive(child);
+    } else {
+      stack.push(child);
+    }
+    return !isNull;
+  }
+
+  @Override
+  public boolean isNextComplexMultiValue() throws IOException {
+    final byte isNullByte = inputByteBuffer.read(columnSortOrderIsDesc[root.index]);
+    final boolean isEnded;
+
+    switch (isNullByte) {
+    case 0:
+      isEnded = true;
+      break;
+
+    case 1:
+      isEnded = false;
+      break;
+
+    default:
+      throw new RuntimeException();
+    }
+
+    if (isEnded) {
+      stack.pop();
+      stack.peek();
+    }
+    return !isEnded;
+  }
+
+  @Override
+  public void finishComplexVariableFieldsType() {
+    stack.pop();
+    if (stack.peek() == null) {
+      throw new RuntimeException();
+    }
+    stack.peek();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableSerializeWrite.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableSerializeWrite.java b/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableSerializeWrite.java
index a9ea7c0..5be7714 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableSerializeWrite.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableSerializeWrite.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
@@ -34,7 +36,6 @@ import org.apache.hadoop.hive.serde2.fast.SerializeWrite;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hive.common.util.DateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,8 +58,7 @@ public final class BinarySortableSerializeWrite implements SerializeWrite {
   // Which field we are on.  We start with -1 to be consistent in style with
   // BinarySortableDeserializeRead.
   private int index;
-
-  private int fieldCount;
+  private int level;
 
   private TimestampWritable tempTimestampWritable;
 
@@ -67,7 +67,6 @@ public final class BinarySortableSerializeWrite implements SerializeWrite {
   public BinarySortableSerializeWrite(boolean[] columnSortOrderIsDesc,
           byte[] columnNullMarker, byte[] columnNotNullMarker) {
     this();
-    fieldCount = columnSortOrderIsDesc.length;
     this.columnSortOrderIsDesc = columnSortOrderIsDesc;
     this.columnNullMarker = columnNullMarker;
     this.columnNotNullMarker = columnNotNullMarker;
@@ -79,7 +78,6 @@ public final class BinarySortableSerializeWrite implements SerializeWrite {
    */
   public BinarySortableSerializeWrite(int fieldCount) {
     this();
-    this.fieldCount = fieldCount;
     columnSortOrderIsDesc = new boolean[fieldCount];
     Arrays.fill(columnSortOrderIsDesc, false);
     columnNullMarker = new byte[fieldCount];
@@ -101,6 +99,7 @@ public final class BinarySortableSerializeWrite implements SerializeWrite {
     this.output = output;
     this.output.reset();
     index = -1;
+    level = 0;
   }
 
   /*
@@ -110,6 +109,7 @@ public final class BinarySortableSerializeWrite implements SerializeWrite {
   public void setAppend(Output output) {
     this.output = output;
     index = -1;
+    level = 0;
   }
 
   /*
@@ -119,6 +119,7 @@ public final class BinarySortableSerializeWrite implements SerializeWrite {
   public void reset() {
     output.reset();
     index = -1;
+    level = 0;
   }
 
   /*
@@ -126,23 +127,26 @@ public final class BinarySortableSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeNull() throws IOException {
-    ++index;
+    if (level == 0) {
+      index++;
+    }
     BinarySortableSerDe.writeByte(output, columnNullMarker[index], columnSortOrderIsDesc[index]);
   }
 
+  private void beginElement() {
+    if (level == 0) {
+      index++;
+    }
+    BinarySortableSerDe.writeByte(output, columnNotNullMarker[index], columnSortOrderIsDesc[index]);
+  }
+
   /*
    * BOOLEAN.
    */
   @Override
   public void writeBoolean(boolean v) throws IOException {
-    ++index;
-
-    final boolean invert = columnSortOrderIsDesc[index];
-
-    // This field is not a null.
-    BinarySortableSerDe.writeByte(output, columnNotNullMarker[index], invert);
-
-    BinarySortableSerDe.writeByte(output, (byte) (v ? 2 : 1), invert);
+    beginElement();
+    BinarySortableSerDe.writeByte(output, (byte) (v ? 2 : 1), columnSortOrderIsDesc[index]);
   }
 
   /*
@@ -150,14 +154,8 @@ public final class BinarySortableSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeByte(byte v) throws IOException {
-    ++index;
-
-    final boolean invert = columnSortOrderIsDesc[index];
-
-    // This field is not a null.
-    BinarySortableSerDe.writeByte(output, columnNotNullMarker[index], invert);
-
-    BinarySortableSerDe.writeByte(output, (byte) (v ^ 0x80), invert);
+    beginElement();
+    BinarySortableSerDe.writeByte(output, (byte) (v ^ 0x80), columnSortOrderIsDesc[index]);
   }
 
   /*
@@ -165,14 +163,8 @@ public final class BinarySortableSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeShort(short v) throws IOException {
-    ++index;
-
-    final boolean invert = columnSortOrderIsDesc[index];
-
-    // This field is not a null.
-    BinarySortableSerDe.writeByte(output, columnNotNullMarker[index], invert);
-
-    BinarySortableSerDe.serializeShort(output, v, invert);
+    beginElement();
+    BinarySortableSerDe.serializeShort(output, v, columnSortOrderIsDesc[index]);
   }
 
   /*
@@ -180,14 +172,8 @@ public final class BinarySortableSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeInt(int v) throws IOException {
-    ++index;
-
-    final boolean invert = columnSortOrderIsDesc[index];
-
-    // This field is not a null.
-    BinarySortableSerDe.writeByte(output, columnNotNullMarker[index], invert);
-
-    BinarySortableSerDe.serializeInt(output, v, invert);
+    beginElement();
+    BinarySortableSerDe.serializeInt(output, v, columnSortOrderIsDesc[index]);
   }
 
   /*
@@ -195,14 +181,8 @@ public final class BinarySortableSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeLong(long v) throws IOException {
-    ++index;
-
-    final boolean invert = columnSortOrderIsDesc[index];
-
-    // This field is not a null.
-    BinarySortableSerDe.writeByte(output, columnNotNullMarker[index], invert);
-
-    BinarySortableSerDe.serializeLong(output, v, invert);
+    beginElement();
+    BinarySortableSerDe.serializeLong(output, v, columnSortOrderIsDesc[index]);
   }
 
   /*
@@ -210,14 +190,8 @@ public final class BinarySortableSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeFloat(float vf) throws IOException {
-    ++index;
-
-    final boolean invert = columnSortOrderIsDesc[index];
-
-    // This field is not a null.
-    BinarySortableSerDe.writeByte(output, columnNotNullMarker[index], invert);
-
-    BinarySortableSerDe.serializeFloat(output, vf, invert);
+    beginElement();
+    BinarySortableSerDe.serializeFloat(output, vf, columnSortOrderIsDesc[index]);
   }
 
   /*
@@ -225,14 +199,8 @@ public final class BinarySortableSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeDouble(double vd) throws IOException {
-    ++index;
-
-    final boolean invert = columnSortOrderIsDesc[index];
-
-    // This field is not a null.
-    BinarySortableSerDe.writeByte(output, columnNotNullMarker[index], invert);
-
-    BinarySortableSerDe.serializeDouble(output, vd, invert);
+    beginElement();
+    BinarySortableSerDe.serializeDouble(output, vd, columnSortOrderIsDesc[index]);
   }
 
   /*
@@ -243,26 +211,14 @@ public final class BinarySortableSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeString(byte[] v) throws IOException {
-    ++index;
-
-    final boolean invert = columnSortOrderIsDesc[index];
-
-    // This field is not a null.
-    BinarySortableSerDe.writeByte(output, columnNotNullMarker[index], invert);
-
-    BinarySortableSerDe.serializeBytes(output, v, 0, v.length, invert);
+    beginElement();
+    BinarySortableSerDe.serializeBytes(output, v, 0, v.length, columnSortOrderIsDesc[index]);
   }
 
   @Override
   public void writeString(byte[] v, int start, int length) throws IOException {
-    ++index;
-
-    final boolean invert = columnSortOrderIsDesc[index];
-
-    // This field is not a null.
-    BinarySortableSerDe.writeByte(output, columnNotNullMarker[index], invert);
-
-    BinarySortableSerDe.serializeBytes(output, v, start, length, invert);
+    beginElement();
+    BinarySortableSerDe.serializeBytes(output, v, start, length, columnSortOrderIsDesc[index]);
   }
 
   /*
@@ -290,26 +246,14 @@ public final class BinarySortableSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeBinary(byte[] v) throws IOException {
-    ++index;
-
-    final boolean invert = columnSortOrderIsDesc[index];
-
-    // This field is not a null.
-    BinarySortableSerDe.writeByte(output, columnNotNullMarker[index], invert);
-
-    BinarySortableSerDe.serializeBytes(output, v, 0, v.length, invert);
+    beginElement();
+    BinarySortableSerDe.serializeBytes(output, v, 0, v.length, columnSortOrderIsDesc[index]);
   }
 
   @Override
   public void writeBinary(byte[] v, int start, int length) {
-    ++index;
-
-    final boolean invert = columnSortOrderIsDesc[index];
-
-    // This field is not a null.
-    BinarySortableSerDe.writeByte(output, columnNotNullMarker[index], invert);
-
-    BinarySortableSerDe.serializeBytes(output, v, start, length, invert);
+    beginElement();
+    BinarySortableSerDe.serializeBytes(output, v, start, length, columnSortOrderIsDesc[index]);
   }
 
   /*
@@ -317,27 +261,15 @@ public final class BinarySortableSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeDate(Date date) throws IOException {
-    ++index;
-
-    final boolean invert = columnSortOrderIsDesc[index];
-
-    // This field is not a null.
-    BinarySortableSerDe.writeByte(output, columnNotNullMarker[index], invert);
-
-    BinarySortableSerDe.serializeInt(output, DateWritable.dateToDays(date), invert);
+    beginElement();
+    BinarySortableSerDe.serializeInt(output, DateWritable.dateToDays(date), columnSortOrderIsDesc[index]);
   }
 
   // We provide a faster way to write a date without a Date object.
   @Override
   public void writeDate(int dateAsDays) throws IOException {
-    ++index;
-
-    final boolean invert = columnSortOrderIsDesc[index];
-
-    // This field is not a null.
-    BinarySortableSerDe.writeByte(output, columnNotNullMarker[index], invert);
-
-    BinarySortableSerDe.serializeInt(output, dateAsDays, invert);
+    beginElement();
+    BinarySortableSerDe.serializeInt(output, dateAsDays, columnSortOrderIsDesc[index]);
   }
 
   /*
@@ -345,15 +277,9 @@ public final class BinarySortableSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeTimestamp(Timestamp vt) throws IOException {
-    ++index;
-
-    final boolean invert = columnSortOrderIsDesc[index];
-
-    // This field is not a null.
-    BinarySortableSerDe.writeByte(output, columnNotNullMarker[index], invert);
-
+    beginElement();
     tempTimestampWritable.set(vt);
-    BinarySortableSerDe.serializeTimestampWritable(output, tempTimestampWritable, invert);
+    BinarySortableSerDe.serializeTimestampWritable(output, tempTimestampWritable, columnSortOrderIsDesc[index]);
   }
 
   /*
@@ -361,26 +287,14 @@ public final class BinarySortableSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeHiveIntervalYearMonth(HiveIntervalYearMonth viyt) throws IOException {
-    ++index;
-
-    final boolean invert = columnSortOrderIsDesc[index];
-
-    // This field is not a null.
-    BinarySortableSerDe.writeByte(output, columnNotNullMarker[index], invert);
-
-    BinarySortableSerDe.serializeHiveIntervalYearMonth(output, viyt, invert);
+    beginElement();
+    BinarySortableSerDe.serializeHiveIntervalYearMonth(output, viyt, columnSortOrderIsDesc[index]);
   }
 
   @Override
   public void writeHiveIntervalYearMonth(int totalMonths) throws IOException {
-    ++index;
-
-    final boolean invert = columnSortOrderIsDesc[index];
-
-    // This field is not a null.
-    BinarySortableSerDe.writeByte(output, columnNotNullMarker[index], invert);
-
-    BinarySortableSerDe.serializeInt(output, totalMonths, invert);
+    beginElement();
+    BinarySortableSerDe.serializeInt(output, totalMonths, columnSortOrderIsDesc[index]);
   }
 
   /*
@@ -388,14 +302,8 @@ public final class BinarySortableSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeHiveIntervalDayTime(HiveIntervalDayTime vidt) throws IOException {
-    ++index;
-
-    final boolean invert = columnSortOrderIsDesc[index];
-
-    // This field is not a null.
-    BinarySortableSerDe.writeByte(output, columnNotNullMarker[index], invert);
-
-    BinarySortableSerDe.serializeHiveIntervalDayTime(output, vidt, invert);
+    beginElement();
+    BinarySortableSerDe.serializeHiveIntervalDayTime(output, vidt, columnSortOrderIsDesc[index]);
   }
 
   /*
@@ -406,31 +314,104 @@ public final class BinarySortableSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeHiveDecimal(HiveDecimal dec, int scale) throws IOException {
-    ++index;
-
-    final boolean invert = columnSortOrderIsDesc[index];
-
-    // This field is not a null.
-    BinarySortableSerDe.writeByte(output, columnNotNullMarker[index], invert);
-
+    beginElement();
     if (decimalBytesScratch == null) {
       decimalBytesScratch = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
     }
-    BinarySortableSerDe.serializeHiveDecimal(output, dec, invert, decimalBytesScratch);
+    BinarySortableSerDe.serializeHiveDecimal(output, dec, columnSortOrderIsDesc[index], decimalBytesScratch);
   }
 
   @Override
   public void writeHiveDecimal(HiveDecimalWritable decWritable, int scale) throws IOException {
-    ++index;
+    beginElement();
+    if (decimalBytesScratch == null) {
+      decimalBytesScratch = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
+    }
+    BinarySortableSerDe.serializeHiveDecimal(output, decWritable, columnSortOrderIsDesc[index], decimalBytesScratch);
+  }
 
-    final boolean invert = columnSortOrderIsDesc[index];
+  /*
+   * List
+   */
+  @Override
+  public void beginList(List list) {
+    beginElement();
+    level++;
+    if (!list.isEmpty()) {
+      BinarySortableSerDe.writeByte(output, (byte) 1, columnSortOrderIsDesc[index]);
+    }
+  }
 
-    // This field is not a null.
-    BinarySortableSerDe.writeByte(output, columnNotNullMarker[index], invert);
+  @Override
+  public void separateList() {
+    BinarySortableSerDe.writeByte(output, (byte) 1, columnSortOrderIsDesc[index]);
+  }
 
-    if (decimalBytesScratch == null) {
-      decimalBytesScratch = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
+  @Override
+  public void finishList() {
+    level--;
+    // and \0 to terminate
+    BinarySortableSerDe.writeByte(output, (byte) 0, columnSortOrderIsDesc[index]);
+  }
+
+  /*
+   * Map
+   */
+  @Override
+  public void beginMap(Map<?, ?> map) {
+    beginElement();
+    level++;
+    if (!map.isEmpty()) {
+      BinarySortableSerDe.writeByte(output, (byte) 1, columnSortOrderIsDesc[index]);
     }
-    BinarySortableSerDe.serializeHiveDecimal(output, decWritable, invert, decimalBytesScratch);
+  }
+
+  @Override
+  public void separateKey() {
+  }
+
+  @Override
+  public void separateKeyValuePair() {
+    BinarySortableSerDe.writeByte(output, (byte) 1, columnSortOrderIsDesc[index]);
+  }
+
+  @Override
+  public void finishMap() {
+    level--;
+    // and \0 to terminate
+    BinarySortableSerDe.writeByte(output, (byte) 0, columnSortOrderIsDesc[index]);
+  }
+
+  /*
+   * Struct
+   */
+  @Override
+  public void beginStruct(List fieldValues) {
+    beginElement();
+    level++;
+  }
+
+  @Override
+  public void separateStruct() {
+  }
+
+  @Override
+  public void finishStruct() {
+    level--;
+  }
+
+  /*
+   * Union
+   */
+  @Override
+  public void beginUnion(int tag) throws IOException {
+    beginElement();
+    BinarySortableSerDe.writeByte(output, (byte) tag, columnSortOrderIsDesc[index]);
+    level++;
+  }
+
+  @Override
+  public void finishUnion() {
+    level--;
   }
 }