You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2017/05/18 22:44:02 UTC

[3/8] hive git commit: HIVE-16207: Add support for Complex Types in Fast SerDe (Teddy Choi, reviewed by Matt McCline)

http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleSerializeWrite.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleSerializeWrite.java b/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleSerializeWrite.java
index 1401ac3..ef77daf 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleSerializeWrite.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleSerializeWrite.java
@@ -22,6 +22,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.codec.binary.Base64;
 import org.slf4j.Logger;
@@ -48,7 +52,6 @@ import org.apache.hadoop.hive.serde2.lazy.LazyTimestamp;
 import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
 import org.apache.hadoop.hive.serde2.fast.SerializeWrite;
 import org.apache.hadoop.io.Text;
-import org.apache.hive.common.util.DateUtils;
 
 /*
  * Directly serialize, field-by-field, the LazyBinary format.
@@ -60,7 +63,7 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
 
   private LazySerDeParameters lazyParams;
 
-  private byte separator;
+  private byte[] separators;
   private boolean[] needsEscape;
   private boolean isEscaped;
   private byte escapeChar;
@@ -70,6 +73,8 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
 
   private int fieldCount;
   private int index;
+  private int currentLevel;
+  private Deque<Integer> indexStack = new ArrayDeque<Integer>();
 
   // For thread safety, we allocate private writable objects for our use only.
   private DateWritable dateWritable;
@@ -80,14 +85,14 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
   private byte[] decimalScratchBuffer;
 
   public LazySimpleSerializeWrite(int fieldCount,
-    byte separator, LazySerDeParameters lazyParams) {
+    LazySerDeParameters lazyParams) {
 
     this();
     this.fieldCount = fieldCount;
-  
-    this.separator = separator;
+
     this.lazyParams = lazyParams;
 
+    separators = lazyParams.getSeparators();
     isEscaped = lazyParams.isEscaped();
     escapeChar = lazyParams.getEscapeChar();
     needsEscape = lazyParams.getNeedsEscape();
@@ -106,6 +111,7 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
     this.output = output;
     output.reset();
     index = 0;
+    currentLevel = 0;
   }
 
   /*
@@ -115,6 +121,7 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
   public void setAppend(Output output) {
     this.output = output;
     index = 0;
+    currentLevel = 0;
   }
 
   /*
@@ -124,35 +131,19 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
   public void reset() {
     output.reset();
     index = 0;
+    currentLevel = 0;
   }
 
   /*
-   * General Pattern:
-   *
-   *  if (index > 0) {
-   *    output.write(separator);
-   *  }
-   *
-   *  WHEN NOT NULL: Write value.
-   *  OTHERWISE NULL: Write nullSequenceBytes.
-   *
-   *  Increment index
-   *
-   */
-
-  /*
    * Write a NULL field.
    */
   @Override
   public void writeNull() throws IOException {
-
-    if (index > 0) {
-      output.write(separator);
-    }
+    beginPrimitive();
 
     output.write(nullSequenceBytes);
 
-    index++;
+    finishPrimitive();
   }
 
   /*
@@ -160,18 +151,13 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeBoolean(boolean v) throws IOException {
-
-    if (index > 0) {
-      output.write(separator);
-    }
-
+    beginPrimitive();
     if (v) {
       output.write(LazyUtils.trueBytes, 0, LazyUtils.trueBytes.length);
     } else {
       output.write(LazyUtils.falseBytes, 0, LazyUtils.falseBytes.length);
     }
-
-    index++;
+    finishPrimitive();
   }
 
   /*
@@ -179,14 +165,9 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeByte(byte v) throws IOException {
-
-    if (index > 0) {
-      output.write(separator);
-    }
-
+    beginPrimitive();
     LazyInteger.writeUTF8(output, v);
-
-    index++;
+    finishPrimitive();
   }
 
   /*
@@ -194,14 +175,9 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeShort(short v) throws IOException {
-
-    if (index > 0) {
-      output.write(separator);
-    }
-
+    beginPrimitive();
     LazyInteger.writeUTF8(output, v);
-
-    index++;
+    finishPrimitive();
   }
 
   /*
@@ -209,14 +185,9 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeInt(int v) throws IOException {
-
-    if (index > 0) {
-      output.write(separator);
-    }
-
+    beginPrimitive();
     LazyInteger.writeUTF8(output, v);
-
-    index++;
+    finishPrimitive();
   }
 
   /*
@@ -224,14 +195,9 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeLong(long v) throws IOException {
-
-    if (index > 0) {
-      output.write(separator);
-    }
-
+    beginPrimitive();
     LazyLong.writeUTF8(output, v);
-
-    index++;
+    finishPrimitive();
   }
 
   /*
@@ -239,15 +205,10 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeFloat(float vf) throws IOException {
-
-    if (index > 0) {
-      output.write(separator);
-    }
-
+    beginPrimitive();
     ByteBuffer b = Text.encode(String.valueOf(vf));
     output.write(b.array(), 0, b.limit());
-
-    index++;
+    finishPrimitive();
   }
 
   /*
@@ -255,15 +216,10 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeDouble(double v) throws IOException  {
-
-    if (index > 0) {
-      output.write(separator);
-    }
-
+    beginPrimitive();
     ByteBuffer b = Text.encode(String.valueOf(v));
     output.write(b.array(), 0, b.limit());
-
-    index++;
+    finishPrimitive();
   }
 
   /*
@@ -274,28 +230,20 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeString(byte[] v) throws IOException  {
-
-    if (index > 0) {
-      output.write(separator);
+    beginPrimitive();
+    if (v.equals(nullSequenceBytes)) {
     }
-
     LazyUtils.writeEscaped(output, v, 0, v.length, isEscaped, escapeChar,
         needsEscape);
-
-    index++;
+    finishPrimitive();
   }
 
   @Override
   public void writeString(byte[] v, int start, int length) throws IOException {
-
-    if (index > 0) {
-      output.write(separator);
-    }
-
+    beginPrimitive();
     LazyUtils.writeEscaped(output, v, start, length, isEscaped, escapeChar,
         needsEscape);
-
-    index++;
+    finishPrimitive();
   }
 
   /*
@@ -303,16 +251,11 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeHiveChar(HiveChar hiveChar) throws IOException {
-
-    if (index > 0) {
-      output.write(separator);
-    }
-
+    beginPrimitive();
     ByteBuffer b = Text.encode(hiveChar.getPaddedValue());
     LazyUtils.writeEscaped(output, b.array(), 0, b.limit(), isEscaped, escapeChar,
         needsEscape);
-
-    index++;
+    finishPrimitive();
   }
 
   /*
@@ -320,16 +263,11 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeHiveVarchar(HiveVarchar hiveVarchar) throws IOException {
-
-    if (index > 0) {
-      output.write(separator);
-    }
-
+    beginPrimitive();
     ByteBuffer b = Text.encode(hiveVarchar.getValue());
     LazyUtils.writeEscaped(output, b.array(), 0, b.limit(), isEscaped, escapeChar,
         needsEscape);
-
-    index++;
+    finishPrimitive();
   }
 
   /*
@@ -337,32 +275,22 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeBinary(byte[] v) throws IOException {
-
-    if (index > 0) {
-      output.write(separator);
-    }
-
+    beginPrimitive();
     byte[] toEncode = new byte[v.length];
     System.arraycopy(v, 0, toEncode, 0, v.length);
     byte[] toWrite = Base64.encodeBase64(toEncode);
     output.write(toWrite, 0, toWrite.length);
-
-    index++;
+    finishPrimitive();
   }
 
   @Override
   public void writeBinary(byte[] v, int start, int length) throws IOException {
-
-    if (index > 0) {
-      output.write(separator);
-    }
-
+    beginPrimitive();
     byte[] toEncode = new byte[length];
     System.arraycopy(v, start, toEncode, 0, length);
     byte[] toWrite = Base64.encodeBase64(toEncode);
     output.write(toWrite, 0, toWrite.length);
-
-    index++;
+    finishPrimitive();
   }
 
   /*
@@ -370,35 +298,25 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeDate(Date date) throws IOException {
-
-    if (index > 0) {
-      output.write(separator);
-    }
-
+    beginPrimitive();
     if (dateWritable == null) {
       dateWritable = new DateWritable();
     }
     dateWritable.set(date);
     LazyDate.writeUTF8(output, dateWritable);
-
-    index++;
+    finishPrimitive();
   }
 
   // We provide a faster way to write a date without a Date object.
   @Override
   public void writeDate(int dateAsDays) throws IOException {
-
-    if (index > 0) {
-      output.write(separator);
-    }
-
+    beginPrimitive();
     if (dateWritable == null) {
       dateWritable = new DateWritable();
     }
     dateWritable.set(dateAsDays);
     LazyDate.writeUTF8(output, dateWritable);
-
-    index++;
+    finishPrimitive();
   }
 
   /*
@@ -406,18 +324,13 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeTimestamp(Timestamp v) throws IOException {
-
-    if (index > 0) {
-      output.write(separator);
-    }
-
+    beginPrimitive();
     if (timestampWritable == null) {
       timestampWritable = new TimestampWritable();
     }
     timestampWritable.set(v);
     LazyTimestamp.writeUTF8(output, timestampWritable);
-
-    index++;
+    finishPrimitive();
   }
 
   /*
@@ -425,35 +338,25 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeHiveIntervalYearMonth(HiveIntervalYearMonth viyt) throws IOException {
-
-    if (index > 0) {
-      output.write(separator);
-    }
-
+    beginPrimitive();
     if (hiveIntervalYearMonthWritable == null) {
       hiveIntervalYearMonthWritable = new HiveIntervalYearMonthWritable();
     }
     hiveIntervalYearMonthWritable.set(viyt);
     LazyHiveIntervalYearMonth.writeUTF8(output, hiveIntervalYearMonthWritable);
-
-    index++;
+    finishPrimitive();
   }
 
 
   @Override
   public void writeHiveIntervalYearMonth(int totalMonths) throws IOException {
-
-    if (index > 0) {
-      output.write(separator);
-    }
-
+    beginPrimitive();
     if (hiveIntervalYearMonthWritable == null) {
       hiveIntervalYearMonthWritable = new HiveIntervalYearMonthWritable();
     }
     hiveIntervalYearMonthWritable.set(totalMonths);
     LazyHiveIntervalYearMonth.writeUTF8(output, hiveIntervalYearMonthWritable);
-
-    index++;
+    finishPrimitive();
   }
 
   /*
@@ -461,18 +364,13 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeHiveIntervalDayTime(HiveIntervalDayTime vidt) throws IOException {
-
-    if (index > 0) {
-      output.write(separator);
-    }
-
+    beginPrimitive();
     if (hiveIntervalDayTimeWritable == null) {
       hiveIntervalDayTimeWritable = new HiveIntervalDayTimeWritable();
     }
     hiveIntervalDayTimeWritable.set(vidt);
     LazyHiveIntervalDayTime.writeUTF8(output, hiveIntervalDayTimeWritable);
-
-    index++;
+    finishPrimitive();
   }
 
   /*
@@ -483,29 +381,119 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeHiveDecimal(HiveDecimal dec, int scale) throws IOException {
-    if (index > 0) {
-      output.write(separator);
-    }
-
+    beginPrimitive();
     if (decimalScratchBuffer == null) {
       decimalScratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
     }
     LazyHiveDecimal.writeUTF8(output, dec, scale, decimalScratchBuffer);
-
-    index++;
+    finishPrimitive();
   }
 
   @Override
   public void writeHiveDecimal(HiveDecimalWritable decWritable, int scale) throws IOException {
-    if (index > 0) {
-      output.write(separator);
-    }
-
+    beginPrimitive();
     if (decimalScratchBuffer == null) {
       decimalScratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
     }
     LazyHiveDecimal.writeUTF8(output, decWritable, scale, decimalScratchBuffer);
+    finishPrimitive();
+  }
+
+  private void beginComplex() {
+    if (index > 0) {
+      output.write(separators[currentLevel]);
+    }
+    indexStack.push(index);
+
+    // Always use index 0 so the write methods don't write a separator.
+    index = 0;
+
+    // Set "global" separator member to next level.
+    currentLevel++;
+  }
+
+  private void finishComplex() {
+    currentLevel--;
+    index = indexStack.pop();
+    index++;
+  }
+
+  @Override
+  public void beginList(List list) {
+    beginComplex();
+  }
+
+  @Override
+  public void separateList() {
+  }
+
+  @Override
+  public void finishList() {
+    finishComplex();
+  }
+
+  @Override
+  public void beginMap(Map<?, ?> map) {
+    beginComplex();
+
+    // MAP requires 2 levels: key separator and key-pair separator.
+    currentLevel++;
+  }
+
+  @Override
+  public void separateKey() {
+    index = 0;
+    output.write(separators[currentLevel]);
+  }
+
+  @Override
+  public void separateKeyValuePair() {
+    index = 0;
+    output.write(separators[currentLevel - 1]);
+  }
+
+  @Override
+  public void finishMap() {
+    // Remove MAP extra level.
+    currentLevel--;
+
+    finishComplex();
+  }
+
+  @Override
+  public void beginStruct(List fieldValues) {
+    beginComplex();
+  }
+
+  @Override
+  public void separateStruct() {
+  }
+
+  @Override
+  public void finishStruct() {
+    finishComplex();
+  }
+
+  @Override
+  public void beginUnion(int tag) throws IOException {
+    beginComplex();
+    writeInt(tag);
+    output.write(separators[currentLevel]);
+    index = 0;
+  }
+
+  @Override
+  public void finishUnion() {
+    finishComplex();
+  }
+
+  private void beginPrimitive() {
+    if (index > 0) {
+      output.write(separators[currentLevel]);
+    }
+  }
 
+  private void finishPrimitive() {
     index++;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java b/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java
index e94ae99..8e0a499 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java
@@ -20,19 +20,28 @@ package org.apache.hadoop.hive.serde2.lazybinary.fast;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.util.ArrayDeque;
 import java.util.Arrays;
+import java.util.Deque;
+import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.serde2.fast.DeserializeRead;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils.VInt;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils.VLong;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
 import org.apache.hadoop.io.WritableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /*
  * Directly deserialize with the caller reading field-by-field the LazyBinary serialization format.
@@ -55,26 +64,84 @@ public final class LazyBinaryDeserializeRead extends DeserializeRead {
   private int start;
   private int offset;
   private int end;
-  private int fieldCount;
-  private int fieldStart;
-  private int fieldIndex;
-  private byte nullByte;
+
+  private boolean skipLengthPrefix = false;
 
   // Object to receive results of reading a decoded variable length int or long.
   private VInt tempVInt;
   private VLong tempVLong;
 
+  private Deque<Field> stack = new ArrayDeque<>();
+  private Field root;
+
+  private class Field {
+    Field[] children;
+
+    Category category;
+    PrimitiveCategory primitiveCategory;
+    TypeInfo typeInfo;
+
+    int index;
+    int count;
+    int start;
+    int end;
+    int nullByteStart;
+    byte nullByte;
+    byte tag;
+  }
+
   public LazyBinaryDeserializeRead(TypeInfo[] typeInfos, boolean useExternalBuffer) {
     super(typeInfos, useExternalBuffer);
-    fieldCount = typeInfos.length;
     tempVInt = new VInt();
     tempVLong = new VLong();
     currentExternalBufferNeeded = false;
+
+    root = new Field();
+    root.category = Category.STRUCT;
+    root.children = createFields(typeInfos);
+    root.count = typeInfos.length;
   }
 
-  // Not public since we must have the field count so every 8 fields NULL bytes can be navigated.
-  private LazyBinaryDeserializeRead() {
-    super();
+  private Field[] createFields(TypeInfo[] typeInfos) {
+    final Field[] children = new Field[typeInfos.length];
+    for (int i = 0; i < typeInfos.length; i++) {
+      children[i] = createField(typeInfos[i]);
+    }
+    return children;
+  }
+
+  private Field createField(TypeInfo typeInfo) {
+    final Field field = new Field();
+    final Category category = typeInfo.getCategory();
+    field.category = category;
+    field.typeInfo = typeInfo;
+    switch (category) {
+    case PRIMITIVE:
+      field.primitiveCategory = ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
+      break;
+    case LIST:
+      field.children = new Field[1];
+      field.children[0] = createField(((ListTypeInfo) typeInfo).getListElementTypeInfo());
+      break;
+    case MAP:
+      field.children = new Field[2];
+      field.children[0] = createField(((MapTypeInfo) typeInfo).getMapKeyTypeInfo());
+      field.children[1] = createField(((MapTypeInfo) typeInfo).getMapValueTypeInfo());
+      break;
+    case STRUCT:
+      final StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+      final List<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+      field.children = createFields(fieldTypeInfos.toArray(new TypeInfo[fieldTypeInfos.size()]));
+      break;
+    case UNION:
+      final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
+      final List<TypeInfo> objectTypeInfos = unionTypeInfo.getAllUnionObjectTypeInfos();
+      field.children = createFields(objectTypeInfos.toArray(new TypeInfo[objectTypeInfos.size()]));
+      break;
+    default:
+      throw new RuntimeException();
+    }
+    return field;
   }
 
   /*
@@ -86,7 +153,20 @@ public final class LazyBinaryDeserializeRead extends DeserializeRead {
     this.offset = offset;
     start = offset;
     end = offset + length;
-    fieldIndex = 0;
+
+    stack.clear();
+    stack.push(root);
+    clearIndex(root);
+  }
+
+  private void clearIndex(Field field) {
+    field.index = 0;
+    if (field.children == null) {
+      return;
+    }
+    for (Field child : field.children) {
+      clearIndex(child);
+    }
   }
 
   /*
@@ -102,13 +182,13 @@ public final class LazyBinaryDeserializeRead extends DeserializeRead {
     sb.append(" for length ");
     sb.append(end - start);
     sb.append(" to read ");
-    sb.append(fieldCount);
+    sb.append(root.children.length);
     sb.append(" fields with types ");
     sb.append(Arrays.toString(typeInfos));
     sb.append(".  Read field #");
-    sb.append(fieldIndex);
+    sb.append(root.index);
     sb.append(" at field start position ");
-    sb.append(fieldStart);
+    sb.append(root.start);
     sb.append(" current read offset ");
     sb.append(offset);
 
@@ -127,263 +207,196 @@ public final class LazyBinaryDeserializeRead extends DeserializeRead {
    */
   @Override
   public boolean readNextField() throws IOException {
-    if (fieldIndex >= fieldCount) {
-      return false;
-    }
-
-    fieldStart = offset;
+    return readComplexField();
+  }
 
-    if (fieldIndex == 0) {
-      // The rest of the range check for fields after the first is below after checking
-      // the NULL byte.
-      if (offset >= end) {
+  private boolean readPrimitive(Field field) throws IOException {
+    final PrimitiveCategory primitiveCategory = field.primitiveCategory;
+    final TypeInfo typeInfo = field.typeInfo;
+    switch (primitiveCategory) {
+    case BOOLEAN:
+      // No check needed for single byte read.
+      currentBoolean = (bytes[offset++] != 0);
+      break;
+    case BYTE:
+      // No check needed for single byte read.
+      currentByte = bytes[offset++];
+      break;
+    case SHORT:
+      // Last item -- ok to be at end.
+      if (offset + 2 > end) {
         throw new EOFException();
       }
-      nullByte = bytes[offset++];
-    }
-
-    // NOTE: The bit is set to 1 if a field is NOT NULL.    boolean isNull;
-    if ((nullByte & (1 << (fieldIndex % 8))) == 0) {
-
-      // Logically move past this field.
-      fieldIndex++;
-
-      // Every 8 fields we read a new NULL byte.
-      if (fieldIndex < fieldCount) {
-        if ((fieldIndex % 8) == 0) {
-          // Get next null byte.
-          if (offset >= end) {
-            throw new EOFException();
-          }
-          nullByte = bytes[offset++];
-        }
+      currentShort = LazyBinaryUtils.byteArrayToShort(bytes, offset);
+      offset += 2;
+      break;
+    case INT:
+      // Parse the first byte of a vint/vlong to determine the number of bytes.
+      if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
+        throw new EOFException();
       }
-      return false;
-    } else {
-
-      // Make sure there is at least one byte that can be read for a value.
-      if (offset >= end) {
+      LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
+      offset += tempVInt.length;
+      currentInt = tempVInt.value;
+      break;
+    case LONG:
+      // Parse the first byte of a vint/vlong to determine the number of bytes.
+      if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
         throw new EOFException();
       }
-
-      /*
-       * We have a field and are positioned to it.  Read it.
-       */
-      switch (primitiveCategories[fieldIndex]) {
-      case BOOLEAN:
-        // No check needed for single byte read.
-        currentBoolean = (bytes[offset++] != 0);
-        break;
-      case BYTE:
-        // No check needed for single byte read.
-        currentByte = bytes[offset++];
-        break;
-      case SHORT:
-        // Last item -- ok to be at end.
-        if (offset + 2 > end) {
-          throw new EOFException();
-        }
-        currentShort = LazyBinaryUtils.byteArrayToShort(bytes, offset);
-        offset += 2;
-        break;
-      case INT:
+      LazyBinaryUtils.readVLong(bytes, offset, tempVLong);
+      offset += tempVLong.length;
+      currentLong = tempVLong.value;
+      break;
+    case FLOAT:
+      // Last item -- ok to be at end.
+      if (offset + 4 > end) {
+        throw new EOFException();
+      }
+      currentFloat = Float.intBitsToFloat(LazyBinaryUtils.byteArrayToInt(bytes, offset));
+      offset += 4;
+      break;
+    case DOUBLE:
+      // Last item -- ok to be at end.
+      if (offset + 8 > end) {
+        throw new EOFException();
+      }
+      currentDouble = Double.longBitsToDouble(LazyBinaryUtils.byteArrayToLong(bytes, offset));
+      offset += 8;
+      break;
+
+    case BINARY:
+    case STRING:
+    case CHAR:
+    case VARCHAR:
+      {
+        // using vint instead of 4 bytes
         // Parse the first byte of a vint/vlong to determine the number of bytes.
         if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
           throw new EOFException();
         }
         LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
         offset += tempVInt.length;
-        currentInt = tempVInt.value;
-        break;
-      case LONG:
-        // Parse the first byte of a vint/vlong to determine the number of bytes.
-        if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
-          throw new EOFException();
-        }
-        LazyBinaryUtils.readVLong(bytes, offset, tempVLong);
-        offset += tempVLong.length;
-        currentLong = tempVLong.value;
-        break;
-      case FLOAT:
+
+        int saveStart = offset;
+        int length = tempVInt.value;
+        offset += length;
         // Last item -- ok to be at end.
-        if (offset + 4 > end) {
+        if (offset > end) {
           throw new EOFException();
         }
-        currentFloat = Float.intBitsToFloat(LazyBinaryUtils.byteArrayToInt(bytes, offset));
-        offset += 4;
-        break;
-      case DOUBLE:
+
+        currentBytes = bytes;
+        currentBytesStart = saveStart;
+        currentBytesLength = length;
+      }
+      break;
+    case DATE:
+      // Parse the first byte of a vint/vlong to determine the number of bytes.
+      if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
+        throw new EOFException();
+      }
+      LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
+      offset += tempVInt.length;
+
+      currentDateWritable.set(tempVInt.value);
+      break;
+    case TIMESTAMP:
+      {
+        int length = TimestampWritable.getTotalLength(bytes, offset);
+        int saveStart = offset;
+        offset += length;
         // Last item -- ok to be at end.
-        if (offset + 8 > end) {
+        if (offset > end) {
           throw new EOFException();
         }
-        currentDouble = Double.longBitsToDouble(LazyBinaryUtils.byteArrayToLong(bytes, offset));
-        offset += 8;
-        break;
-
-      case BINARY:
-      case STRING:
-      case CHAR:
-      case VARCHAR:
-        {
-          // using vint instead of 4 bytes
-          // Parse the first byte of a vint/vlong to determine the number of bytes.
-          if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
-            throw new EOFException();
-          }
-          LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
-          offset += tempVInt.length;
-
-          int saveStart = offset;
-          int length = tempVInt.value;
-          offset += length;
-          // Last item -- ok to be at end.
-          if (offset > end) {
-            throw new EOFException();
-          }
-
-          currentBytes = bytes;
-          currentBytesStart = saveStart;
-          currentBytesLength = length;
-        }
-        break;
-      case DATE:
+
+        currentTimestampWritable.set(bytes, saveStart);
+      }
+      break;
+    case INTERVAL_YEAR_MONTH:
+      // Parse the first byte of a vint/vlong to determine the number of bytes.
+      if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
+        throw new EOFException();
+      }
+      LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
+      offset += tempVInt.length;
+
+      currentHiveIntervalYearMonthWritable.set(tempVInt.value);
+      break;
+    case INTERVAL_DAY_TIME:
+      // The first bounds check requires at least one more byte beyond for 2nd int (hence >=).
+      // Parse the first byte of a vint/vlong to determine the number of bytes.
+      if (offset + WritableUtils.decodeVIntSize(bytes[offset]) >= end) {
+        throw new EOFException();
+      }
+      LazyBinaryUtils.readVLong(bytes, offset, tempVLong);
+      offset += tempVLong.length;
+
+      // Parse the first byte of a vint/vlong to determine the number of bytes.
+      if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
+        throw new EOFException();
+      }
+      LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
+      offset += tempVInt.length;
+
+      currentHiveIntervalDayTimeWritable.set(tempVLong.value, tempVInt.value);
+      break;
+    case DECIMAL:
+      {
+        // Since enforcing precision and scale can cause a HiveDecimal to become NULL,
+        // we must read it, enforce it here, and either return NULL or buffer the result.
+
+        // These calls are to see how much data there is. The setFromBytes call below will do the same
+        // readVInt reads but actually unpack the decimal.
+
+        // The first bounds check requires at least one more byte beyond for 2nd int (hence >=).
         // Parse the first byte of a vint/vlong to determine the number of bytes.
-        if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
+        if (offset + WritableUtils.decodeVIntSize(bytes[offset]) >= end) {
           throw new EOFException();
         }
         LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
         offset += tempVInt.length;
+        int readScale = tempVInt.value;
 
-        currentDateWritable.set(tempVInt.value);
-        break;
-      case TIMESTAMP:
-        {
-          int length = TimestampWritable.getTotalLength(bytes, offset);
-          int saveStart = offset;
-          offset += length;
-          // Last item -- ok to be at end.
-          if (offset > end) {
-            throw new EOFException();
-          }
-
-          currentTimestampWritable.set(bytes, saveStart);
-        }
-        break;
-      case INTERVAL_YEAR_MONTH:
         // Parse the first byte of a vint/vlong to determine the number of bytes.
         if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
           throw new EOFException();
         }
         LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
         offset += tempVInt.length;
-
-        currentHiveIntervalYearMonthWritable.set(tempVInt.value);
-        break;
-      case INTERVAL_DAY_TIME:
-        // The first bounds check requires at least one more byte beyond for 2nd int (hence >=).
-        // Parse the first byte of a vint/vlong to determine the number of bytes.
-        if (offset + WritableUtils.decodeVIntSize(bytes[offset]) >= end) {
+        int saveStart = offset;
+        offset += tempVInt.value;
+        // Last item -- ok to be at end.
+        if (offset > end) {
           throw new EOFException();
         }
-        LazyBinaryUtils.readVLong(bytes, offset, tempVLong);
-        offset += tempVLong.length;
+        int length = offset - saveStart;
 
-        // Parse the first byte of a vint/vlong to determine the number of bytes.
-        if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
-          throw new EOFException();
-        }
-        LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
-        offset += tempVInt.length;
+        //   scale = 2, length = 6, value = -6065716379.11
+        //   \002\006\255\114\197\131\083\105
+        //           \255\114\197\131\083\105
 
-        currentHiveIntervalDayTimeWritable.set(tempVLong.value, tempVInt.value);
-        break;
-      case DECIMAL:
-        {
-          // Since enforcing precision and scale can cause a HiveDecimal to become NULL,
-          // we must read it, enforce it here, and either return NULL or buffer the result.
-
-          // These calls are to see how much data there is. The setFromBytes call below will do the same
-          // readVInt reads but actually unpack the decimal.
-
-          // The first bounds check requires at least one more byte beyond for 2nd int (hence >=).
-          // Parse the first byte of a vint/vlong to determine the number of bytes.
-          if (offset + WritableUtils.decodeVIntSize(bytes[offset]) >= end) {
-            throw new EOFException();
-          }
-          LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
-          offset += tempVInt.length;
-          int readScale = tempVInt.value;
-
-          // Parse the first byte of a vint/vlong to determine the number of bytes.
-          if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
-            throw new EOFException();
-          }
-          LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
-          offset += tempVInt.length;
-          int saveStart = offset;
-          offset += tempVInt.value;
-          // Last item -- ok to be at end.
-          if (offset > end) {
-            throw new EOFException();
-          }
-          int length = offset - saveStart;
-
-          //   scale = 2, length = 6, value = -6065716379.11
-          //   \002\006\255\114\197\131\083\105
-          //           \255\114\197\131\083\105
-
-          currentHiveDecimalWritable.setFromBigIntegerBytesAndScale(
-              bytes, saveStart, length, readScale);
-          boolean decimalIsNull = !currentHiveDecimalWritable.isSet();
-          if (!decimalIsNull) {
-
-            DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfos[fieldIndex];
-
-            int precision = decimalTypeInfo.getPrecision();
-            int scale = decimalTypeInfo.getScale();
-
-            decimalIsNull = !currentHiveDecimalWritable.mutateEnforcePrecisionScale(precision, scale);
-          }
-          if (decimalIsNull) {
-
-            // Logically move past this field.
-            fieldIndex++;
-
-            // Every 8 fields we read a new NULL byte.
-            if (fieldIndex < fieldCount) {
-              if ((fieldIndex % 8) == 0) {
-                // Get next null byte.
-                if (offset >= end) {
-                  throw new EOFException();
-                }
-                nullByte = bytes[offset++];
-              }
-            }
-            return false;
-          }
-        }
-        break;
+        currentHiveDecimalWritable.setFromBigIntegerBytesAndScale(
+            bytes, saveStart, length, readScale);
+        boolean decimalIsNull = !currentHiveDecimalWritable.isSet();
+        if (!decimalIsNull) {
 
-      default:
-        throw new Error("Unexpected primitive category " + primitiveCategories[fieldIndex].name());
-      }
-    }
+          final DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
 
-    // Logically move past this field.
-    fieldIndex++;
+          final int precision = decimalTypeInfo.getPrecision();
+          final int scale = decimalTypeInfo.getScale();
 
-    // Every 8 fields we read a new NULL byte.
-    if (fieldIndex < fieldCount) {
-      if ((fieldIndex % 8) == 0) {
-        // Get next null byte.
-        if (offset >= end) {
-          throw new EOFException();
+          decimalIsNull = !currentHiveDecimalWritable.mutateEnforcePrecisionScale(precision, scale);
+        }
+        if (decimalIsNull) {
+          return false;
         }
-        nullByte = bytes[offset++];
       }
+      break;
+    default:
+      throw new Error("Unexpected primitive category " + primitiveCategory.name());
     }
-
     return true;
   }
 
@@ -394,8 +407,37 @@ public final class LazyBinaryDeserializeRead extends DeserializeRead {
    * Designed for skipping columns that are not included.
    */
   public void skipNextField() throws IOException {
-    // Not a known use case for LazyBinary -- so don't optimize.
-    readNextField();
+    final Field current = stack.peek();
+    final boolean isNull = isNull(current);
+
+    if (isNull) {
+      current.index++;
+      return;
+    }
+
+    if (readUnionTag(current)) {
+      current.index++;
+      return;
+    }
+
+    final Field child = getChild(current);
+
+    if (child.category == Category.PRIMITIVE) {
+      readPrimitive(child);
+      current.index++;
+    } else {
+      parseHeader(child);
+      stack.push(child);
+
+      for (int i = 0; i < child.count; i++) {
+        skipNextField();
+      }
+      finishComplexVariableFieldsType();
+    }
+
+    if (offset > end) {
+      throw new EOFException();
+    }
   }
 
   /*
@@ -412,4 +454,141 @@ public final class LazyBinaryDeserializeRead extends DeserializeRead {
   public boolean isEndOfInputReached() {
     return (offset == end);
   }
+
+  private boolean isNull(Field field) {
+    final byte b = (byte) (1 << (field.index % 8));
+    switch (field.category) {
+    case PRIMITIVE:
+      return false;
+    case LIST:
+    case MAP:
+      final byte nullByte = bytes[field.nullByteStart + (field.index / 8)];
+      return (nullByte & b) == 0;
+    case STRUCT:
+      if (field.index % 8 == 0) {
+        field.nullByte = bytes[offset++];
+      }
+      return (field.nullByte & b) == 0;
+    case UNION:
+      return false;
+    default:
+      throw new RuntimeException();
+    }
+  }
+
+  private void parseHeader(Field field) {
+    // Init
+    field.index = 0;
+    field.start = offset;
+
+    // Read length
+    if (!skipLengthPrefix) {
+      final int length = LazyBinaryUtils.byteArrayToInt(bytes, offset);
+      offset += 4;
+      field.end = offset + length;
+    }
+
+    switch (field.category) {
+    case LIST:
+    case MAP:
+      // Read count
+      LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
+      if (field.category == Category.LIST) {
+        field.count = tempVInt.value;
+      } else {
+        field.count = tempVInt.value * 2;
+      }
+      offset += tempVInt.length;
+
+      // Null byte start
+      field.nullByteStart = offset;
+      offset += ((field.count) + 7) / 8;
+      break;
+    case STRUCT:
+      field.count = ((StructTypeInfo) field.typeInfo).getAllStructFieldTypeInfos().size();
+      break;
+    case UNION:
+      field.count = 2;
+      break;
+    }
+  }
+
+  private Field getChild(Field field) {
+    switch (field.category) {
+    case LIST:
+      return field.children[0];
+    case MAP:
+      return field.children[field.index % 2];
+    case STRUCT:
+      return field.children[field.index];
+    case UNION:
+      return field.children[field.tag];
+    default:
+      throw new RuntimeException();
+    }
+  }
+
+  private boolean readUnionTag(Field field) {
+    if (field.category == Category.UNION && field.index == 0) {
+      field.tag = bytes[offset++];
+      currentInt = field.tag;
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  // Push or next
+  @Override
+  public boolean readComplexField() throws IOException {
+    final Field current = stack.peek();
+    boolean isNull = isNull(current);
+
+    if (isNull) {
+      current.index++;
+      return false;
+    }
+
+    if (readUnionTag(current)) {
+      current.index++;
+      return true;
+    }
+
+    final Field child = getChild(current);
+
+    if (child.category == Category.PRIMITIVE) {
+      isNull = !readPrimitive(child);
+      current.index++;
+    } else {
+      parseHeader(child);
+      stack.push(child);
+    }
+
+    if (offset > end) {
+      throw new EOFException();
+    }
+    return !isNull;
+  }
+
+  // Pop (list, map)
+  @Override
+  public boolean isNextComplexMultiValue() {
+    Field current = stack.peek();
+    final boolean isNext = current.index < current.count;
+    if (!isNext) {
+      stack.pop();
+      stack.peek().index++;
+    }
+    return isNext;
+  }
+
+  // Pop (struct, union)
+  @Override
+  public void finishComplexVariableFieldsType() {
+    stack.pop();
+    if (stack.peek() == null) {
+      throw new RuntimeException();
+    }
+    stack.peek().index++;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinarySerializeWrite.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinarySerializeWrite.java b/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinarySerializeWrite.java
index 085d71c..e50ff5e 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinarySerializeWrite.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinarySerializeWrite.java
@@ -21,7 +21,13 @@ package org.apache.hadoop.hive.serde2.lazybinary.fast;
 import java.io.IOException;
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
 
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.type.HiveChar;
@@ -38,7 +44,11 @@ import org.apache.hadoop.hive.serde2.io.TimestampWritable;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
 import org.apache.hadoop.hive.serde2.fast.SerializeWrite;
-import org.apache.hive.common.util.DateUtils;
+
+import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.LIST;
+import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.MAP;
+import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.STRUCT;
+import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.UNION;
 
 /*
  * Directly serialize, field-by-field, the LazyBinary format.
@@ -50,10 +60,8 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
 
   private Output output;
 
-  private int fieldCount;
-  private int fieldIndex;
-  private byte nullByte;
-  private long nullOffset;
+  private int rootFieldCount;
+  private boolean skipLengthPrefix = false;
 
   // For thread safety, we allocate private writable objects for our use only.
   private TimestampWritable timestampWritable;
@@ -64,10 +72,30 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
   private long[] scratchLongs;
   private byte[] scratchBuffer;
 
+  private Field root;
+  private Deque<Field> stack = new ArrayDeque<>();
+  private LazyBinarySerDe.BooleanRef warnedOnceNullMapKey;
+
+  private static class Field {
+    Category type;
+
+    int fieldCount;
+    int fieldIndex;
+    int byteSizeStart;
+    int start;
+    long nullOffset;
+    byte nullByte;
+
+    Field(Category type) {
+      this.type = type;
+    }
+  }
+
   public LazyBinarySerializeWrite(int fieldCount) {
     this();
     vLongBytes = new byte[LazyBinaryUtils.VLONG_BYTES_LEN];
-    this.fieldCount = fieldCount;
+    this.rootFieldCount = fieldCount;
+    resetWithoutOutput();
   }
 
   // Not public since we must have the field count and other information.
@@ -81,9 +109,7 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
   public void set(Output output) {
     this.output = output;
     output.reset();
-    fieldIndex = 0;
-    nullByte = 0;
-    nullOffset = 0;
+    resetWithoutOutput();
   }
 
   /*
@@ -92,9 +118,8 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
   @Override
   public void setAppend(Output output) {
     this.output = output;
-    fieldIndex = 0;
-    nullByte = 0;
-    nullOffset = output.getLength();
+    resetWithoutOutput();
+    root.nullOffset = output.getLength();
   }
 
   /*
@@ -103,57 +128,45 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
   @Override
   public void reset() {
     output.reset();
-    fieldIndex = 0;
-    nullByte = 0;
-    nullOffset = 0;
+    resetWithoutOutput();
   }
 
-  /*
-   * General Pattern:
-   *
-   *  // Every 8 fields we write a NULL byte.
-   *  IF ((fieldIndex % 8) == 0), then
-   *    IF (fieldIndex > 0), then
-   *       Write back previous NullByte
-   *       NullByte = 0
-   *       Remember write position
-   *    Allocate room for next NULL byte.
-   *
-   *  WHEN NOT NULL: Set bit in NULL byte; Write value.
-   *  OTHERWISE NULL: We do not set a bit in the nullByte when we are writing a null.
-   *
-   *  Increment fieldIndex
-   *
-   *  IF (fieldIndex == fieldCount), then
-   *     Write back final NullByte
-   *
-   */
+  private void resetWithoutOutput() {
+    root = new Field(STRUCT);
+    root.fieldCount = rootFieldCount;
+    stack.clear();
+    stack.push(root);
+    warnedOnceNullMapKey = null;
+  }
 
   /*
    * Write a NULL field.
    */
   @Override
   public void writeNull() throws IOException {
-
-    // Every 8 fields we write a NULL byte.
-    if ((fieldIndex % 8) == 0) {
-      if (fieldIndex > 0) {
-        // Write back previous 8 field's NULL byte.
-        output.writeByte(nullOffset, nullByte);
-        nullByte = 0;
-        nullOffset = output.getLength();
+    final Field current = stack.peek();
+
+    if (current.type == STRUCT) {
+      // Every 8 fields we write a NULL byte.
+      if ((current.fieldIndex % 8) == 0) {
+        if (current.fieldIndex > 0) {
+          // Write back previous 8 field's NULL byte.
+          output.writeByte(current.nullOffset, current.nullByte);
+          current.nullByte = 0;
+          current.nullOffset = output.getLength();
+        }
+        // Allocate next NULL byte.
+        output.reserve(1);
       }
-      // Allocate next NULL byte.
-      output.reserve(1);
-    }
 
-    // We DO NOT set a bit in the NULL byte when we are writing a NULL.
+      // We DO NOT set a bit in the NULL byte when we are writing a NULL.
 
-    fieldIndex++;
+      current.fieldIndex++;
 
-    if (fieldIndex == fieldCount) {
-      // Write back the final NULL byte before the last fields.
-      output.writeByte(nullOffset, nullByte);
+      if (current.fieldIndex == current.fieldCount) {
+        // Write back the final NULL byte before the last fields.
+        output.writeByte(current.nullOffset, current.nullByte);
+      }
     }
   }
 
@@ -162,30 +175,9 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeBoolean(boolean v) throws IOException {
-
-    // Every 8 fields we write a NULL byte.
-    if ((fieldIndex % 8) == 0) {
-      if (fieldIndex > 0) {
-        // Write back previous 8 field's NULL byte.
-        output.writeByte(nullOffset, nullByte);
-        nullByte = 0;
-        nullOffset = output.getLength();
-      }
-      // Allocate next NULL byte.
-      output.reserve(1);
-    }
-
-    // Set bit in NULL byte when a field is NOT NULL.
-    nullByte |= 1 << (fieldIndex % 8);
-
+    beginElement();
     output.write((byte) (v ? 1 : 0));
-
-    fieldIndex++;
-
-    if (fieldIndex == fieldCount) {
-      // Write back the final NULL byte before the last fields.
-      output.writeByte(nullOffset, nullByte);
-    }
+    finishElement();
   }
 
   /*
@@ -193,30 +185,9 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeByte(byte v) throws IOException {
-
-    // Every 8 fields we write a NULL byte.
-    if ((fieldIndex % 8) == 0) {
-      if (fieldIndex > 0) {
-        // Write back previous 8 field's NULL byte.
-        output.writeByte(nullOffset, nullByte);
-        nullByte = 0;
-        nullOffset = output.getLength();
-      }
-      // Allocate next NULL byte.
-      output.reserve(1);
-    }
-
-    // Set bit in NULL byte when a field is NOT NULL.
-    nullByte |= 1 << (fieldIndex % 8);
-
+    beginElement();
     output.write(v);
-
-    fieldIndex++;
-
-    if (fieldIndex == fieldCount) {
-      // Write back the final NULL byte before the last fields.
-      output.writeByte(nullOffset, nullByte);
-    }
+    finishElement();
   }
 
   /*
@@ -224,31 +195,10 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeShort(short v) throws IOException {
-
-    // Every 8 fields we write a NULL byte.
-    if ((fieldIndex % 8) == 0) {
-      if (fieldIndex > 0) {
-        // Write back previous 8 field's NULL byte.
-        output.writeByte(nullOffset, nullByte);
-        nullByte = 0;
-        nullOffset = output.getLength();
-      }
-      // Allocate next NULL byte.
-      output.reserve(1);
-    }
-
-    // Set bit in NULL byte when a field is NOT NULL.
-    nullByte |= 1 << (fieldIndex % 8);
-
+    beginElement();
     output.write((byte) (v >> 8));
     output.write((byte) (v));
-
-    fieldIndex++;
-
-    if (fieldIndex == fieldCount) {
-      // Write back the final NULL byte before the last fields.
-      output.writeByte(nullOffset, nullByte);
-    }
+    finishElement();
   }
 
   /*
@@ -256,30 +206,9 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeInt(int v) throws IOException {
-
-    // Every 8 fields we write a NULL byte.
-    if ((fieldIndex % 8) == 0) {
-      if (fieldIndex > 0) {
-        // Write back previous 8 field's NULL byte.
-        output.writeByte(nullOffset, nullByte);
-        nullByte = 0;
-        nullOffset = output.getLength();
-      }
-      // Allocate next NULL byte.
-      output.reserve(1);
-    }
-
-    // Set bit in NULL byte when a field is NOT NULL.
-    nullByte |= 1 << (fieldIndex % 8);
-
+    beginElement();
     writeVInt(v);
-
-    fieldIndex++;
-
-    if (fieldIndex == fieldCount) {
-      // Write back the final NULL byte before the last fields.
-      output.writeByte(nullOffset, nullByte);
-    }
+    finishElement();
   }
 
   /*
@@ -287,30 +216,9 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeLong(long v) throws IOException {
-
-    // Every 8 fields we write a NULL byte.
-    if ((fieldIndex % 8) == 0) {
-      if (fieldIndex > 0) {
-        // Write back previous 8 field's NULL byte.
-        output.writeByte(nullOffset, nullByte);
-        nullByte = 0;
-        nullOffset = output.getLength();
-      }
-      // Allocate next NULL byte.
-      output.reserve(1);
-    }
-
-    // Set bit in NULL byte when a field is NOT NULL.
-    nullByte |= 1 << (fieldIndex % 8);
-
+    beginElement();
     writeVLong(v);
-
-    fieldIndex++;
-
-    if (fieldIndex == fieldCount) {
-      // Write back the final NULL byte before the last fields.
-      output.writeByte(nullOffset, nullByte);
-    }
+    finishElement();
   }
 
   /*
@@ -318,34 +226,13 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeFloat(float vf) throws IOException {
-
-    // Every 8 fields we write a NULL byte.
-    if ((fieldIndex % 8) == 0) {
-      if (fieldIndex > 0) {
-        // Write back previous 8 field's NULL byte.
-        output.writeByte(nullOffset, nullByte);
-        nullByte = 0;
-        nullOffset = output.getLength();
-      }
-      // Allocate next NULL byte.
-      output.reserve(1);
-    }
-
-    // Set bit in NULL byte when a field is NOT NULL.
-    nullByte |= 1 << (fieldIndex % 8);
-
+    beginElement();
     int v = Float.floatToIntBits(vf);
     output.write((byte) (v >> 24));
     output.write((byte) (v >> 16));
     output.write((byte) (v >> 8));
     output.write((byte) (v));
-
-    fieldIndex++;
-
-    if (fieldIndex == fieldCount) {
-      // Write back the final NULL byte before the last fields.
-      output.writeByte(nullOffset, nullByte);
-    }
+    finishElement();
   }
 
   /*
@@ -353,97 +240,32 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeDouble(double v) throws IOException {
-
-    // Every 8 fields we write a NULL byte.
-    if ((fieldIndex % 8) == 0) {
-      if (fieldIndex > 0) {
-        // Write back previous 8 field's NULL byte.
-        output.writeByte(nullOffset, nullByte);
-        nullByte = 0;
-        nullOffset = output.getLength();
-      }
-      // Allocate next NULL byte.
-      output.reserve(1);
-    }
-
-    // Set bit in NULL byte when a field is NOT NULL.
-    nullByte |= 1 << (fieldIndex % 8);
-
+    beginElement();
     LazyBinaryUtils.writeDouble(output, v);
-
-    fieldIndex++;
-
-    if (fieldIndex == fieldCount) {
-      // Write back the final NULL byte before the last fields.
-      output.writeByte(nullOffset, nullByte);
-    }
+    finishElement();
   }
 
   /*
    * STRING.
-   * 
+   *
    * Can be used to write CHAR and VARCHAR when the caller takes responsibility for
    * truncation/padding issues.
    */
   @Override
   public void writeString(byte[] v) throws IOException {
-
-    // Every 8 fields we write a NULL byte.
-    if ((fieldIndex % 8) == 0) {
-      if (fieldIndex > 0) {
-        // Write back previous 8 field's NULL byte.
-        output.writeByte(nullOffset, nullByte);
-        nullByte = 0;
-        nullOffset = output.getLength();
-      }
-      // Allocate next NULL byte.
-      output.reserve(1);
-    }
-
-    // Set bit in NULL byte when a field is NOT NULL.
-    nullByte |= 1 << (fieldIndex % 8);
-
-    int length = v.length;
+    beginElement();
+    final int length = v.length;
     writeVInt(length);
-
     output.write(v, 0, length);
-
-    fieldIndex++;
-
-    if (fieldIndex == fieldCount) {
-      // Write back the final NULL byte before the last fields.
-      output.writeByte(nullOffset, nullByte);
-    }
+    finishElement();
   }
 
   @Override
   public void writeString(byte[] v, int start, int length) throws IOException {
-
-    // Every 8 fields we write a NULL byte.
-    if ((fieldIndex % 8) == 0) {
-      if (fieldIndex > 0) {
-        // Write back previous 8 field's NULL byte.
-        output.writeByte(nullOffset, nullByte);
-        nullByte = 0;
-        nullOffset = output.getLength();
-      }
-      // Allocate next NULL byte.
-      output.reserve(1);
-    }
-
-    // Set bit in NULL byte when a field is NOT NULL.
-    nullByte |= 1 << (fieldIndex % 8);
-
+    beginElement();
     writeVInt(length);
-
     output.write(v, start, length);
-
-    fieldIndex++;
-
-    if (fieldIndex == fieldCount) {
-      // Write back the final NULL byte before the last fields.
-      output.writeByte(nullOffset, nullByte);
-    }
+    finishElement();
   }
 
   /*
@@ -451,8 +273,8 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeHiveChar(HiveChar hiveChar) throws IOException {
-    String string = hiveChar.getStrippedValue();
-    byte[] bytes = string.getBytes();
+    final String string = hiveChar.getStrippedValue();
+    final byte[] bytes = string.getBytes();
     writeString(bytes);
   }
 
@@ -461,8 +283,8 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeHiveVarchar(HiveVarchar hiveVarchar) throws IOException {
-    String string = hiveVarchar.getValue();
-    byte[] bytes = string.getBytes();
+    final String string = hiveVarchar.getValue();
+    final byte[] bytes = string.getBytes();
     writeString(bytes);
   }
 
@@ -484,59 +306,17 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeDate(Date date) throws IOException {
-
-    // Every 8 fields we write a NULL byte.
-    if ((fieldIndex % 8) == 0) {
-      if (fieldIndex > 0) {
-        // Write back previous 8 field's NULL byte.
-        output.writeByte(nullOffset, nullByte);
-        nullByte = 0;
-        nullOffset = output.getLength();
-      }
-      // Allocate next NULL byte.
-      output.reserve(1);
-    }
-
-    // Set bit in NULL byte when a field is NOT NULL.
-    nullByte |= 1 << (fieldIndex % 8);
-
+    beginElement();
     writeVInt(DateWritable.dateToDays(date));
-
-    fieldIndex++;
-
-    if (fieldIndex == fieldCount) {
-      // Write back the final NULL byte before the last fields.
-      output.writeByte(nullOffset, nullByte);
-    }
+    finishElement();
   }
 
   // We provide a faster way to write a date without a Date object.
   @Override
   public void writeDate(int dateAsDays) throws IOException {
-
-    // Every 8 fields we write a NULL byte.
-    if ((fieldIndex % 8) == 0) {
-      if (fieldIndex > 0) {
-        // Write back previous 8 field's NULL byte.
-        output.writeByte(nullOffset, nullByte);
-        nullByte = 0;
-        nullOffset = output.getLength();
-      }
-      // Allocate next NULL byte.
-      output.reserve(1);
-    }
-
-    // Set bit in NULL byte when a field is NOT NULL.
-    nullByte |= 1 << (fieldIndex % 8);
-
+    beginElement();
     writeVInt(dateAsDays);
-
-    fieldIndex++;
-
-    if (fieldIndex == fieldCount) {
-      // Write back the final NULL byte before the last fields.
-      output.writeByte(nullOffset, nullByte);
-    }
+    finishElement();
   }
 
   /*
@@ -544,34 +324,13 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeTimestamp(Timestamp v) throws IOException {
-
-    // Every 8 fields we write a NULL byte.
-    if ((fieldIndex % 8) == 0) {
-      if (fieldIndex > 0) {
-        // Write back previous 8 field's NULL byte.
-        output.writeByte(nullOffset, nullByte);
-        nullByte = 0;
-        nullOffset = output.getLength();
-      }
-      // Allocate next NULL byte.
-      output.reserve(1);
-    }
-
-    // Set bit in NULL byte when a field is NOT NULL.
-    nullByte |= 1 << (fieldIndex % 8);
-
+    beginElement();
     if (timestampWritable == null) {
       timestampWritable = new TimestampWritable();
     }
     timestampWritable.set(v);
     timestampWritable.writeToByteStream(output);
-
-    fieldIndex++;
-
-    if (fieldIndex == fieldCount) {
-      // Write back the final NULL byte before the last fields.
-      output.writeByte(nullOffset, nullByte);
-    }
+    finishElement();
   }
 
   /*
@@ -579,66 +338,24 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeHiveIntervalYearMonth(HiveIntervalYearMonth viyt) throws IOException {
-
-    // Every 8 fields we write a NULL byte.
-    if ((fieldIndex % 8) == 0) {
-      if (fieldIndex > 0) {
-        // Write back previous 8 field's NULL byte.
-        output.writeByte(nullOffset, nullByte);
-        nullByte = 0;
-        nullOffset = output.getLength();
-      }
-      // Allocate next NULL byte.
-      output.reserve(1);
-    }
-
-    // Set bit in NULL byte when a field is NOT NULL.
-    nullByte |= 1 << (fieldIndex % 8);
-
+    beginElement();
     if (hiveIntervalYearMonthWritable == null) {
       hiveIntervalYearMonthWritable = new HiveIntervalYearMonthWritable();
     }
     hiveIntervalYearMonthWritable.set(viyt);
     hiveIntervalYearMonthWritable.writeToByteStream(output);
-
-    fieldIndex++;
-
-    if (fieldIndex == fieldCount) {
-      // Write back the final NULL byte before the last fields.
-      output.writeByte(nullOffset, nullByte);
-    }
+    finishElement();
   }
 
   @Override
   public void writeHiveIntervalYearMonth(int totalMonths) throws IOException {
-
-    // Every 8 fields we write a NULL byte.
-    if ((fieldIndex % 8) == 0) {
-      if (fieldIndex > 0) {
-        // Write back previous 8 field's NULL byte.
-        output.writeByte(nullOffset, nullByte);
-        nullByte = 0;
-        nullOffset = output.getLength();
-      }
-      // Allocate next NULL byte.
-      output.reserve(1);
-    }
-
-    // Set bit in NULL byte when a field is NOT NULL.
-    nullByte |= 1 << (fieldIndex % 8);
-
+    beginElement();
     if (hiveIntervalYearMonthWritable == null) {
       hiveIntervalYearMonthWritable = new HiveIntervalYearMonthWritable();
     }
     hiveIntervalYearMonthWritable.set(totalMonths);
     hiveIntervalYearMonthWritable.writeToByteStream(output);
-
-    fieldIndex++;
-
-    if (fieldIndex == fieldCount) {
-      // Write back the final NULL byte before the last fields.
-      output.writeByte(nullOffset, nullByte);
-    }
+    finishElement();
   }
 
   /*
@@ -646,34 +363,13 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeHiveIntervalDayTime(HiveIntervalDayTime vidt) throws IOException {
-
-    // Every 8 fields we write a NULL byte.
-    if ((fieldIndex % 8) == 0) {
-      if (fieldIndex > 0) {
-        // Write back previous 8 field's NULL byte.
-        output.writeByte(nullOffset, nullByte);
-        nullByte = 0;
-        nullOffset = output.getLength();
-      }
-      // Allocate next NULL byte.
-      output.reserve(1);
-    }
-
-    // Set bit in NULL byte when a field is NOT NULL.
-    nullByte |= 1 << (fieldIndex % 8);
-
+    beginElement();
     if (hiveIntervalDayTimeWritable == null) {
       hiveIntervalDayTimeWritable = new HiveIntervalDayTimeWritable();
     }
     hiveIntervalDayTimeWritable.set(vidt);
     hiveIntervalDayTimeWritable.writeToByteStream(output);
-
-    fieldIndex++;
-
-    if (fieldIndex == fieldCount) {
-      // Write back the final NULL byte before the last fields.
-      output.writeByte(nullOffset, nullByte);
-    }
+    finishElement();
   }
 
   /*
@@ -684,22 +380,7 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
    */
   @Override
   public void writeHiveDecimal(HiveDecimal dec, int scale) throws IOException {
-
-    // Every 8 fields we write a NULL byte.
-    if ((fieldIndex % 8) == 0) {
-      if (fieldIndex > 0) {
-        // Write back previous 8 field's NULL byte.
-        output.writeByte(nullOffset, nullByte);
-        nullByte = 0;
-        nullOffset = output.getLength();
-      }
-      // Allocate next NULL byte.
-      output.reserve(1);
-    }
-
-    // Set bit in NULL byte when a field is NOT NULL.
-    nullByte |= 1 << (fieldIndex % 8);
-
+    beginElement();
     if (scratchLongs == null) {
       scratchLongs = new long[HiveDecimal.SCRATCH_LONGS_LEN];
       scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_BIG_INTEGER_BYTES];
@@ -709,33 +390,12 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
         dec,
         scratchLongs,
         scratchBuffer);
-
-    fieldIndex++;
-
-    if (fieldIndex == fieldCount) {
-      // Write back the final NULL byte before the last fields.
-      output.writeByte(nullOffset, nullByte);
-    }
+    finishElement();
   }
 
   @Override
   public void writeHiveDecimal(HiveDecimalWritable decWritable, int scale) throws IOException {
-
-    // Every 8 fields we write a NULL byte.
-    if ((fieldIndex % 8) == 0) {
-      if (fieldIndex > 0) {
-        // Write back previous 8 field's NULL byte.
-        output.writeByte(nullOffset, nullByte);
-        nullByte = 0;
-        nullOffset = output.getLength();
-      }
-      // Allocate next NULL byte.
-      output.reserve(1);
-    }
-
-    // Set bit in NULL byte when a field is NOT NULL.
-    nullByte |= 1 << (fieldIndex % 8);
-
+    beginElement();
     if (scratchLongs == null) {
       scratchLongs = new long[HiveDecimal.SCRATCH_LONGS_LEN];
       scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_BIG_INTEGER_BYTES];
@@ -745,13 +405,7 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
         decWritable,
         scratchLongs,
         scratchBuffer);
-
-    fieldIndex++;
-
-    if (fieldIndex == fieldCount) {
-      // Write back the final NULL byte before the last fields.
-      output.writeByte(nullOffset, nullByte);
-    }
+    finishElement();
   }
 
   /*
@@ -767,4 +421,241 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
     final int len = LazyBinaryUtils.writeVLongToByteArray(vLongBytes, v);
     output.write(vLongBytes, 0, len);
   }
+
+  @Override
+  public void beginList(List list) {
+    final Field current = new Field(LIST);
+    beginComplex(current);
+
+    final int size = list.size();
+    current.fieldCount = size;
+
+    if (!skipLengthPrefix) {
+      // 1/ reserve spaces for the byte size of the list
+      // which is a integer and takes four bytes
+      current.byteSizeStart = output.getLength();
+      output.reserve(4);
+      current.start = output.getLength();
+    }
+    // 2/ write the size of the list as a VInt
+    LazyBinaryUtils.writeVInt(output, size);
+
+    // 3/ write the null bytes
+    byte nullByte = 0;
+    for (int eid = 0; eid < size; eid++) {
+      // set the bit to 1 if an element is not null
+      if (null != list.get(eid)) {
+        nullByte |= 1 << (eid % 8);
+      }
+      // store the byte every eight elements or
+      // if this is the last element
+      if (7 == eid % 8 || eid == size - 1) {
+        output.write(nullByte);
+        nullByte = 0;
+      }
+    }
+  }
+
+  @Override
+  public void separateList() {
+  }
+
+  @Override
+  public void finishList() {
+    final Field current = stack.peek();
+
+    if (!skipLengthPrefix) {
+      // 5/ update the list byte size
+      int listEnd = output.getLength();
+      int listSize = listEnd - current.start;
+      writeSizeAtOffset(output, current.byteSizeStart, listSize);
+    }
+
+    finishComplex();
+  }
+
+  @Override
+  public void beginMap(Map<?, ?> map) {
+    final Field current = new Field(MAP);
+    beginComplex(current);
+
+    if (!skipLengthPrefix) {
+      // 1/ reserve spaces for the byte size of the map
+      // which is a integer and takes four bytes
+      current.byteSizeStart = output.getLength();
+      output.reserve(4);
+      current.start = output.getLength();
+    }
+
+    // 2/ write the size of the map which is a VInt
+    final int size = map.size();
+    current.fieldIndex = size;
+    LazyBinaryUtils.writeVInt(output, size);
+
+    // 3/ write the null bytes
+    int b = 0;
+    byte nullByte = 0;
+    for (Map.Entry<?, ?> entry : map.entrySet()) {
+      // set the bit to 1 if a key is not null
+      if (null != entry.getKey()) {
+        nullByte |= 1 << (b % 8);
+      } else if (warnedOnceNullMapKey != null) {
+        if (!warnedOnceNullMapKey.value) {
+          LOG.warn("Null map key encountered! Ignoring similar problems.");
+        }
+        warnedOnceNullMapKey.value = true;
+      }
+      b++;
+      // set the bit to 1 if a value is not null
+      if (null != entry.getValue()) {
+        nullByte |= 1 << (b % 8);
+      }
+      b++;
+      // write the byte to stream every 4 key-value pairs
+      // or if this is the last key-value pair
+      if (0 == b % 8 || b == size * 2) {
+        output.write(nullByte);
+        nullByte = 0;
+      }
+    }
+  }
+
+  @Override
+  public void separateKey() {
+  }
+
+  @Override
+  public void separateKeyValuePair() {
+  }
+
+  @Override
+  public void finishMap() {
+    final Field current = stack.peek();
+
+    if (!skipLengthPrefix) {
+      // 5/ update the byte size of the map
+      int mapEnd = output.getLength();
+      int mapSize = mapEnd - current.start;
+      writeSizeAtOffset(output, current.byteSizeStart, mapSize);
+    }
+
+    finishComplex();
+  }
+
+  @Override
+  public void beginStruct(List fieldValues) {
+    final Field current = new Field(STRUCT);
+    beginComplex(current);
+
+    current.fieldCount = fieldValues.size();
+
+    if (!skipLengthPrefix) {
+      // 1/ reserve spaces for the byte size of the struct
+      // which is a integer and takes four bytes
+      current.byteSizeStart = output.getLength();
+      output.reserve(4);
+      current.start = output.getLength();
+    }
+    current.nullOffset = output.getLength();
+  }
+
+  @Override
+  public void separateStruct() {
+  }
+
+  @Override
+  public void finishStruct() {
+    final Field current = stack.peek();
+
+    if (!skipLengthPrefix) {
+      // 3/ update the byte size of the struct
+      int typeEnd = output.getLength();
+      int typeSize = typeEnd - current.start;
+      writeSizeAtOffset(output, current.byteSizeStart, typeSize);
+    }
+
+    finishComplex();
+  }
+
+  @Override
+  public void beginUnion(int tag) throws IOException {
+    final Field current = new Field(UNION);
+    beginComplex(current);
+
+    current.fieldCount = 1;
+
+    if (!skipLengthPrefix) {
+      // 1/ reserve spaces for the byte size of the struct
+      // which is a integer and takes four bytes
+      current.byteSizeStart = output.getLength();
+      output.reserve(4);
+      current.start = output.getLength();
+    }
+
+    // 2/ serialize the union
+    output.write(tag);
+  }
+
+  @Override
+  public void finishUnion() {
+    final Field current = stack.peek();
+
+    if (!skipLengthPrefix) {
+      // 3/ update the byte size of the struct
+      int typeEnd = output.getLength();
+      int typeSize = typeEnd - current.start;
+      writeSizeAtOffset(output, current.byteSizeStart, typeSize);
+    }
+
+    finishComplex();
+  }
+
+  private void beginElement() {
+    final Field current = stack.peek();
+
+    if (current.type == STRUCT) {
+      // Every 8 fields we write a NULL byte.
+      if ((current.fieldIndex % 8) == 0) {
+        if (current.fieldIndex > 0) {
+          // Write back previous 8 field's NULL byte.
+          output.writeByte(current.nullOffset, current.nullByte);
+          current.nullByte = 0;
+          current.nullOffset = output.getLength();
+        }
+        // Allocate next NULL byte.
+        output.reserve(1);
+      }
+
+      // Set bit in NULL byte when a field is NOT NULL.
+      current.nullByte |= 1 << (current.fieldIndex % 8);
+    }
+  }
+
+  private void finishElement() {
+    final Field current = stack.peek();
+
+    if (current.type == STRUCT) {
+      current.fieldIndex++;
+
+      if (current.fieldIndex == current.fieldCount) {
+        // Write back the final NULL byte before the last fields.
+        output.writeByte(current.nullOffset, current.nullByte);
+      }
+    }
+  }
+
+  private void beginComplex(Field field) {
+    beginElement();
+    stack.push(field);
+  }
+
+  private void finishComplex() {
+    stack.pop();
+    finishElement();
+  }
+
+  private static void writeSizeAtOffset(
+      ByteStream.RandomAccessOutput byteStream, int byteSizeStart, int size) {
+    byteStream.writeInt(byteSizeStart, size);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardUnionObjectInspector.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardUnionObjectInspector.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardUnionObjectInspector.java
index f26c9ec..7b28682 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardUnionObjectInspector.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardUnionObjectInspector.java
@@ -79,6 +79,31 @@ public class StandardUnionObjectInspector extends SettableUnionObjectInspector {
     public String toString() {
       return tag + ":" + object;
     }
+
+    @Override
+    public int hashCode() {
+      if (object == null) {
+        return tag;
+      } else {
+        return object.hashCode() ^ tag;
+      }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (!(obj instanceof StandardUnion)) {
+        return false;
+      }
+      StandardUnion that = (StandardUnion) obj;
+      if (this.object == null || that.object == null) {
+        return this.tag == that.tag && this.object == that.object;
+      } else {
+        return this.tag == that.tag && this.object.equals(that.object);
+      }
+    }
   }
 
   /**