You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/03/16 18:00:01 UTC

[2/4] arrow git commit: ARROW-542: Adding dictionary encoding to FileWriter

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java b/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java
index ab539d5..8f2d042 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java
@@ -33,10 +33,10 @@ import org.apache.arrow.vector.NullableIntVector;
 import org.apache.arrow.vector.NullableIntervalDayVector;
 import org.apache.arrow.vector.NullableIntervalYearVector;
 import org.apache.arrow.vector.NullableSmallIntVector;
-import org.apache.arrow.vector.NullableTimeStampSecVector;
-import org.apache.arrow.vector.NullableTimeStampMilliVector;
 import org.apache.arrow.vector.NullableTimeStampMicroVector;
+import org.apache.arrow.vector.NullableTimeStampMilliVector;
 import org.apache.arrow.vector.NullableTimeStampNanoVector;
+import org.apache.arrow.vector.NullableTimeStampSecVector;
 import org.apache.arrow.vector.NullableTimeVector;
 import org.apache.arrow.vector.NullableTinyIntVector;
 import org.apache.arrow.vector.NullableUInt1Vector;
@@ -61,10 +61,10 @@ import org.apache.arrow.vector.complex.impl.IntervalDayWriterImpl;
 import org.apache.arrow.vector.complex.impl.IntervalYearWriterImpl;
 import org.apache.arrow.vector.complex.impl.NullableMapWriter;
 import org.apache.arrow.vector.complex.impl.SmallIntWriterImpl;
-import org.apache.arrow.vector.complex.impl.TimeStampSecWriterImpl;
-import org.apache.arrow.vector.complex.impl.TimeStampMilliWriterImpl;
 import org.apache.arrow.vector.complex.impl.TimeStampMicroWriterImpl;
+import org.apache.arrow.vector.complex.impl.TimeStampMilliWriterImpl;
 import org.apache.arrow.vector.complex.impl.TimeStampNanoWriterImpl;
+import org.apache.arrow.vector.complex.impl.TimeStampSecWriterImpl;
 import org.apache.arrow.vector.complex.impl.TimeWriterImpl;
 import org.apache.arrow.vector.complex.impl.TinyIntWriterImpl;
 import org.apache.arrow.vector.complex.impl.UInt1WriterImpl;
@@ -92,6 +92,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType.Time;
 import org.apache.arrow.vector.types.pojo.ArrowType.Timestamp;
 import org.apache.arrow.vector.types.pojo.ArrowType.Union;
 import org.apache.arrow.vector.types.pojo.ArrowType.Utf8;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.util.CallBack;
 
@@ -129,7 +130,7 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
         return ZeroVector.INSTANCE;
       }
 
@@ -145,8 +146,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-         return new NullableMapVector(name, allocator, callBack);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+         return new NullableMapVector(name, allocator, dictionary, callBack);
       }
 
       @Override
@@ -161,8 +162,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableTinyIntVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableTinyIntVector(name, allocator, dictionary);
       }
 
       @Override
@@ -177,8 +178,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableSmallIntVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableSmallIntVector(name, allocator, dictionary);
       }
 
       @Override
@@ -193,8 +194,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableIntVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableIntVector(name, allocator, dictionary);
       }
 
       @Override
@@ -209,8 +210,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableBigIntVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableBigIntVector(name, allocator, dictionary);
       }
 
       @Override
@@ -225,8 +226,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableDateVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableDateVector(name, allocator, dictionary);
       }
 
       @Override
@@ -241,8 +242,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableTimeVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableTimeVector(name, allocator, dictionary);
       }
 
       @Override
@@ -258,8 +259,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableTimeStampSecVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableTimeStampSecVector(name, allocator, dictionary);
       }
 
       @Override
@@ -275,8 +276,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableTimeStampMilliVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableTimeStampMilliVector(name, allocator, dictionary);
       }
 
       @Override
@@ -292,8 +293,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableTimeStampMicroVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableTimeStampMicroVector(name, allocator, dictionary);
       }
 
       @Override
@@ -309,8 +310,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableTimeStampNanoVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableTimeStampNanoVector(name, allocator, dictionary);
       }
 
       @Override
@@ -325,8 +326,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableIntervalDayVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableIntervalDayVector(name, allocator, dictionary);
       }
 
       @Override
@@ -341,8 +342,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableIntervalDayVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableIntervalDayVector(name, allocator, dictionary);
       }
 
       @Override
@@ -358,8 +359,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableFloat4Vector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableFloat4Vector(name, allocator, dictionary);
       }
 
       @Override
@@ -375,8 +376,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableFloat8Vector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableFloat8Vector(name, allocator, dictionary);
       }
 
       @Override
@@ -391,8 +392,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableBitVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableBitVector(name, allocator, dictionary);
       }
 
       @Override
@@ -407,8 +408,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableVarCharVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableVarCharVector(name, allocator, dictionary);
       }
 
       @Override
@@ -423,8 +424,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableVarBinaryVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableVarBinaryVector(name, allocator, dictionary);
       }
 
       @Override
@@ -443,8 +444,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableDecimalVector(name, allocator, precisionScale[0], precisionScale[1]);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableDecimalVector(name, allocator, dictionary, precisionScale[0], precisionScale[1]);
       }
 
       @Override
@@ -459,8 +460,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableUInt1Vector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableUInt1Vector(name, allocator, dictionary);
       }
 
       @Override
@@ -475,8 +476,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableUInt2Vector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableUInt2Vector(name, allocator, dictionary);
       }
 
       @Override
@@ -491,8 +492,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableUInt4Vector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableUInt4Vector(name, allocator, dictionary);
       }
 
       @Override
@@ -507,8 +508,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableUInt8Vector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableUInt8Vector(name, allocator, dictionary);
       }
 
       @Override
@@ -523,8 +524,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new ListVector(name, allocator, callBack);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new ListVector(name, allocator, dictionary, callBack);
       }
 
       @Override
@@ -539,7 +540,10 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        if (dictionary != null) {
+          throw new UnsupportedOperationException("Dictionary encoding not supported for complex types");
+        }
         return new UnionVector(name, allocator, callBack);
       }
 
@@ -561,7 +565,7 @@ public class Types {
 
     public abstract Field getField();
 
-    public abstract FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale);
+    public abstract FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale);
 
     public abstract FieldWriter getNewFieldWriter(ValueVector vector);
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/DictionaryEncoding.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/DictionaryEncoding.java b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/DictionaryEncoding.java
new file mode 100644
index 0000000..6d35cde
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/DictionaryEncoding.java
@@ -0,0 +1,51 @@
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.arrow.vector.types.pojo;
+
+import org.apache.arrow.vector.types.pojo.ArrowType.Int;
+
+public class DictionaryEncoding {
+
+  private final long id;
+  private final boolean ordered;
+  private final Int indexType;
+
+  public DictionaryEncoding(long id, boolean ordered, Int indexType) {
+    this.id = id;
+    this.ordered = ordered;
+    this.indexType = indexType == null ? new Int(32, true) : indexType;
+  }
+
+  public long getId() {
+    return id;
+  }
+
+  public boolean isOrdered() {
+    return ordered;
+  }
+
+  public Int getIndexType() {
+    return indexType;
+  }
+
+  @Override
+  public String toString() {
+    return "DictionaryEncoding[id=" + id + ",ordered=" + ordered + ",indexType=" + indexType + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java
index f9b79ce..bbbd559 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java
@@ -24,23 +24,27 @@ import static org.apache.arrow.vector.types.pojo.ArrowType.getTypeForField;
 import java.util.List;
 import java.util.Objects;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import org.apache.arrow.flatbuf.DictionaryEncoding;
-import org.apache.arrow.vector.schema.TypeLayout;
-import org.apache.arrow.vector.schema.VectorLayout;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.flatbuffers.FlatBufferBuilder;
 
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.schema.TypeLayout;
+import org.apache.arrow.vector.schema.VectorLayout;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.ArrowType.Int;
+
 public class Field {
   private final String name;
   private final boolean nullable;
   private final ArrowType type;
-  private final Long dictionary;
+  private final DictionaryEncoding dictionary;
   private final List<Field> children;
   private final TypeLayout typeLayout;
 
@@ -49,7 +53,7 @@ public class Field {
       @JsonProperty("name") String name,
       @JsonProperty("nullable") boolean nullable,
       @JsonProperty("type") ArrowType type,
-      @JsonProperty("dictionary") Long dictionary,
+      @JsonProperty("dictionary") DictionaryEncoding dictionary,
       @JsonProperty("children") List<Field> children,
       @JsonProperty("typeLayout") TypeLayout typeLayout) {
     this.name = name;
@@ -68,18 +72,30 @@ public class Field {
     this(name, nullable, type, null, children, TypeLayout.getTypeLayout(checkNotNull(type)));
   }
 
-  public Field(String name, boolean nullable, ArrowType type, Long dictionary, List<Field> children) {
+  public Field(String name, boolean nullable, ArrowType type, DictionaryEncoding dictionary, List<Field> children) {
     this(name, nullable, type, dictionary, children, TypeLayout.getTypeLayout(checkNotNull(type)));
   }
 
+  public FieldVector createVector(BufferAllocator allocator) {
+    MinorType minorType = Types.getMinorTypeForArrowType(type);
+    FieldVector vector = minorType.getNewVector(name, allocator, dictionary, null);
+    vector.initializeChildrenFromFields(children);
+    return vector;
+  }
+
   public static Field convertField(org.apache.arrow.flatbuf.Field field) {
     String name = field.name();
     boolean nullable = field.nullable();
     ArrowType type = getTypeForField(field);
-    DictionaryEncoding dictionaryEncoding = field.dictionary();
-    Long dictionary = null;
-    if (dictionaryEncoding != null) {
-      dictionary = dictionaryEncoding.id();
+    DictionaryEncoding dictionary = null;
+    org.apache.arrow.flatbuf.DictionaryEncoding dictionaryFB = field.dictionary();
+    if (dictionaryFB != null) {
+      Int indexType = null;
+      org.apache.arrow.flatbuf.Int indexTypeFB = dictionaryFB.indexType();
+      if (indexTypeFB != null) {
+        indexType = new Int(indexTypeFB.bitWidth(), indexTypeFB.isSigned());
+      }
+      dictionary = new DictionaryEncoding(dictionaryFB.id(), dictionaryFB.isOrdered(), indexType);
     }
     ImmutableList.Builder<org.apache.arrow.vector.schema.VectorLayout> layout = ImmutableList.builder();
     for (int i = 0; i < field.layoutLength(); ++i) {
@@ -105,8 +121,11 @@ public class Field {
     int typeOffset = type.getType(builder);
     int dictionaryOffset = -1;
     if (dictionary != null) {
-      builder.addLong(dictionary);
-      dictionaryOffset = builder.offset();
+      // TODO encode dictionary type - currently type is only signed 32 bit int (default null)
+      org.apache.arrow.flatbuf.DictionaryEncoding.startDictionaryEncoding(builder);
+      org.apache.arrow.flatbuf.DictionaryEncoding.addId(builder, dictionary.getId());
+      org.apache.arrow.flatbuf.DictionaryEncoding.addIsOrdered(builder, dictionary.isOrdered());
+      dictionaryOffset = org.apache.arrow.flatbuf.DictionaryEncoding.endDictionaryEncoding(builder);
     }
     int[] childrenData = new int[children.size()];
     for (int i = 0; i < children.size(); i++) {
@@ -126,11 +145,11 @@ public class Field {
     org.apache.arrow.flatbuf.Field.addNullable(builder, nullable);
     org.apache.arrow.flatbuf.Field.addTypeType(builder, type.getTypeID().getFlatbufID());
     org.apache.arrow.flatbuf.Field.addType(builder, typeOffset);
+    org.apache.arrow.flatbuf.Field.addChildren(builder, childrenOffset);
+    org.apache.arrow.flatbuf.Field.addLayout(builder, layoutOffset);
     if (dictionary != null) {
       org.apache.arrow.flatbuf.Field.addDictionary(builder, dictionaryOffset);
     }
-    org.apache.arrow.flatbuf.Field.addChildren(builder, childrenOffset);
-    org.apache.arrow.flatbuf.Field.addLayout(builder, layoutOffset);
     return org.apache.arrow.flatbuf.Field.endField(builder);
   }
 
@@ -147,7 +166,7 @@ public class Field {
   }
 
   @JsonInclude(Include.NON_NULL)
-  public Long getDictionary() { return dictionary; }
+  public DictionaryEncoding getDictionary() { return dictionary; }
 
   public List<Field> getChildren() {
     return children;
@@ -168,8 +187,8 @@ public class Field {
             Objects.equals(this.type, that.type) &&
            Objects.equals(this.dictionary, that.dictionary) &&
             (Objects.equals(this.children, that.children) ||
-                    (this.children == null && that.children.size() == 0) ||
-                    (this.children.size() == 0 && that.children == null));
+                    (this.children == null || this.children.size() == 0) &&
+                    (that.children == null || that.children.size() == 0));
   }
 
   @Override
@@ -180,7 +199,7 @@ public class Field {
     }
     sb.append(type);
     if (dictionary != null) {
-      sb.append("[dictionary: ").append(dictionary).append("]");
+      sb.append("[dictionary: ").append(dictionary.getId()).append("]");
     }
     if (!children.isEmpty()) {
       sb.append("<").append(Joiner.on(", ").join(children)).append(">");

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java
index cca35e4..20f4aa8 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java
@@ -44,7 +44,7 @@ public class TestDecimalVector {
   @Test
   public void test() {
     BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
-    NullableDecimalVector decimalVector = new NullableDecimalVector("decimal", allocator, 10, scale);
+    NullableDecimalVector decimalVector = new NullableDecimalVector("decimal", allocator, null, 10, scale);
     decimalVector.allocateNew();
     BigDecimal[] values = new BigDecimal[intValues.length];
     for (int i = 0; i < intValues.length; i++) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/TestDictionaryVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestDictionaryVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestDictionaryVector.java
index 962950a..e3087ef 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestDictionaryVector.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestDictionaryVector.java
@@ -18,16 +18,16 @@
 package org.apache.arrow.vector;
 
 import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.complex.DictionaryVector;
-import org.apache.arrow.vector.types.Dictionary;
+import org.apache.arrow.vector.dictionary.DictionaryEncoder;
+import org.apache.arrow.vector.dictionary.Dictionary;
 import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.nio.charset.StandardCharsets;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
 public class TestDictionaryVector {
@@ -49,65 +49,10 @@ public class TestDictionaryVector {
   }
 
   @Test
-  public void testEncodeStringsWithGeneratedDictionary() {
+  public void testEncodeStrings() {
     // Create a new value vector
-    try (final NullableVarCharVector vector = (NullableVarCharVector) MinorType.VARCHAR.getNewVector("foo", allocator, null)) {
-      final NullableVarCharVector.Mutator m = vector.getMutator();
-      vector.allocateNew(512, 5);
-
-      // set some values
-      m.setSafe(0, zero, 0, zero.length);
-      m.setSafe(1, one, 0, one.length);
-      m.setSafe(2, one, 0, one.length);
-      m.setSafe(3, two, 0, two.length);
-      m.setSafe(4, zero, 0, zero.length);
-      m.setValueCount(5);
-
-      DictionaryVector encoded = DictionaryVector.encode(vector);
-
-      try {
-        // verify values in the dictionary
-        ValueVector dictionary = encoded.getDictionaryVector();
-        assertEquals(vector.getClass(), dictionary.getClass());
-
-        NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) dictionary).getAccessor();
-        assertEquals(3, dictionaryAccessor.getValueCount());
-        assertArrayEquals(zero, dictionaryAccessor.get(0));
-        assertArrayEquals(one, dictionaryAccessor.get(1));
-        assertArrayEquals(two, dictionaryAccessor.get(2));
-
-        // verify indices
-        ValueVector indices = encoded.getIndexVector();
-        assertEquals(NullableIntVector.class, indices.getClass());
-
-        NullableIntVector.Accessor indexAccessor = ((NullableIntVector) indices).getAccessor();
-        assertEquals(5, indexAccessor.getValueCount());
-        assertEquals(0, indexAccessor.get(0));
-        assertEquals(1, indexAccessor.get(1));
-        assertEquals(1, indexAccessor.get(2));
-        assertEquals(2, indexAccessor.get(3));
-        assertEquals(0, indexAccessor.get(4));
-
-        // now run through the decoder and verify we get the original back
-        try (ValueVector decoded = DictionaryVector.decode(indices, encoded.getDictionary())) {
-          assertEquals(vector.getClass(), decoded.getClass());
-          assertEquals(vector.getAccessor().getValueCount(), decoded.getAccessor().getValueCount());
-          for (int i = 0; i < 5; i++) {
-            assertEquals(vector.getAccessor().getObject(i), decoded.getAccessor().getObject(i));
-          }
-        }
-      } finally {
-        encoded.getDictionaryVector().close();
-        encoded.getIndexVector().close();
-      }
-    }
-  }
-
-  @Test
-  public void testEncodeStringsWithProvidedDictionary() {
-    // Create a new value vector
-    try (final NullableVarCharVector vector = (NullableVarCharVector) MinorType.VARCHAR.getNewVector("foo", allocator, null);
-         final NullableVarCharVector dictionary = (NullableVarCharVector) MinorType.VARCHAR.getNewVector("dict", allocator, null)) {
+    try (final NullableVarCharVector vector = (NullableVarCharVector) MinorType.VARCHAR.getNewVector("foo", allocator, null, null);
+         final NullableVarCharVector dictionaryVector = (NullableVarCharVector) MinorType.VARCHAR.getNewVector("dict", allocator, null, null)) {
       final NullableVarCharVector.Mutator m = vector.getMutator();
       vector.allocateNew(512, 5);
 
@@ -120,19 +65,20 @@ public class TestDictionaryVector {
       m.setValueCount(5);
 
       // set some dictionary values
-      final NullableVarCharVector.Mutator m2 = dictionary.getMutator();
-      dictionary.allocateNew(512, 3);
+      final NullableVarCharVector.Mutator m2 = dictionaryVector.getMutator();
+      dictionaryVector.allocateNew(512, 3);
       m2.setSafe(0, zero, 0, zero.length);
       m2.setSafe(1, one, 0, one.length);
       m2.setSafe(2, two, 0, two.length);
       m2.setValueCount(3);
 
-      try(final DictionaryVector encoded = DictionaryVector.encode(vector, new Dictionary(dictionary, false))) {
+      Dictionary dictionary = new Dictionary(dictionaryVector, new DictionaryEncoding(1L, false, null));
+
+      try(final ValueVector encoded = (FieldVector) DictionaryEncoder.encode(vector, dictionary)) {
         // verify indices
-        ValueVector indices = encoded.getIndexVector();
-        assertEquals(NullableIntVector.class, indices.getClass());
+        assertEquals(NullableIntVector.class, encoded.getClass());
 
-        NullableIntVector.Accessor indexAccessor = ((NullableIntVector) indices).getAccessor();
+        NullableIntVector.Accessor indexAccessor = ((NullableIntVector) encoded).getAccessor();
         assertEquals(5, indexAccessor.getValueCount());
         assertEquals(0, indexAccessor.get(0));
         assertEquals(1, indexAccessor.get(1));
@@ -141,7 +87,7 @@ public class TestDictionaryVector {
         assertEquals(0, indexAccessor.get(4));
 
         // now run through the decoder and verify we get the original back
-        try (ValueVector decoded = DictionaryVector.decode(indices, encoded.getDictionary())) {
+        try (ValueVector decoded = DictionaryEncoder.decode(encoded, dictionary)) {
           assertEquals(vector.getClass(), decoded.getClass());
           assertEquals(vector.getAccessor().getValueCount(), decoded.getAccessor().getValueCount());
           for (int i = 0; i < 5; i++) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java
index 1f0baae..18d93b6 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java
@@ -42,8 +42,8 @@ public class TestListVector {
 
   @Test
   public void testCopyFrom() throws Exception {
-    try (ListVector inVector = new ListVector("input", allocator, null);
-         ListVector outVector = new ListVector("output", allocator, null)) {
+    try (ListVector inVector = new ListVector("input", allocator, null, null);
+         ListVector outVector = new ListVector("output", allocator, null, null)) {
       UnionListWriter writer = inVector.getWriter();
       writer.allocate();
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java
index 774b59e..6917638 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java
@@ -86,7 +86,7 @@ public class TestValueVector {
   public void testNullableVarLen2() {
 
     // Create a new value vector for 1024 integers.
-    try (final NullableVarCharVector vector = new NullableVarCharVector(EMPTY_SCHEMA_PATH, allocator)) {
+    try (final NullableVarCharVector vector = new NullableVarCharVector(EMPTY_SCHEMA_PATH, allocator, null)) {
       final NullableVarCharVector.Mutator m = vector.getMutator();
       vector.allocateNew(1024 * 10, 1024);
 
@@ -116,7 +116,7 @@ public class TestValueVector {
   public void testNullableFixedType() {
 
     // Create a new value vector for 1024 integers.
-    try (final NullableUInt4Vector vector = new NullableUInt4Vector(EMPTY_SCHEMA_PATH, allocator)) {
+    try (final NullableUInt4Vector vector = new NullableUInt4Vector(EMPTY_SCHEMA_PATH, allocator, null)) {
       final NullableUInt4Vector.Mutator m = vector.getMutator();
       vector.allocateNew(1024);
 
@@ -186,7 +186,7 @@ public class TestValueVector {
   @Test
   public void testNullableFloat() {
     // Create a new value vector for 1024 integers
-    try (final NullableFloat4Vector vector = (NullableFloat4Vector) MinorType.FLOAT4.getNewVector(EMPTY_SCHEMA_PATH, allocator, null)) {
+    try (final NullableFloat4Vector vector = (NullableFloat4Vector) MinorType.FLOAT4.getNewVector(EMPTY_SCHEMA_PATH, allocator, null, null)) {
       final NullableFloat4Vector.Mutator m = vector.getMutator();
       vector.allocateNew(1024);
 
@@ -233,7 +233,7 @@ public class TestValueVector {
   @Test
   public void testNullableInt() {
     // Create a new value vector for 1024 integers
-    try (final NullableIntVector vector = (NullableIntVector) MinorType.INT.getNewVector(EMPTY_SCHEMA_PATH, allocator, null)) {
+    try (final NullableIntVector vector = (NullableIntVector) MinorType.INT.getNewVector(EMPTY_SCHEMA_PATH, allocator, null, null)) {
       final NullableIntVector.Mutator m = vector.getMutator();
       vector.allocateNew(1024);
 
@@ -403,7 +403,7 @@ public class TestValueVector {
   @Test
   public void testReAllocNullableFixedWidthVector() {
     // Create a new value vector for 1024 integers
-    try (final NullableFloat4Vector vector = (NullableFloat4Vector) MinorType.FLOAT4.getNewVector(EMPTY_SCHEMA_PATH, allocator, null)) {
+    try (final NullableFloat4Vector vector = (NullableFloat4Vector) MinorType.FLOAT4.getNewVector(EMPTY_SCHEMA_PATH, allocator, null, null)) {
       final NullableFloat4Vector.Mutator m = vector.getMutator();
       vector.allocateNew(1024);
 
@@ -436,7 +436,7 @@ public class TestValueVector {
   @Test
   public void testReAllocNullableVariableWidthVector() {
     // Create a new value vector for 1024 integers
-    try (final NullableVarCharVector vector = (NullableVarCharVector) MinorType.VARCHAR.getNewVector(EMPTY_SCHEMA_PATH, allocator, null)) {
+    try (final NullableVarCharVector vector = (NullableVarCharVector) MinorType.VARCHAR.getNewVector(EMPTY_SCHEMA_PATH, allocator, null, null)) {
       final NullableVarCharVector.Mutator m = vector.getMutator();
       vector.allocateNew();
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
index 79c9d50..372bcf0 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import io.netty.buffer.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.complex.MapVector;
@@ -46,8 +47,6 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Test;
 
-import io.netty.buffer.ArrowBuf;
-
 public class TestVectorUnloadLoad {
 
   static final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
@@ -81,8 +80,8 @@ public class TestVectorUnloadLoad {
       try (
           ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
           BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-          VectorSchemaRoot newRoot = new VectorSchemaRoot(schema, finalVectorsAllocator);
-          ) {
+          VectorSchemaRoot newRoot = VectorSchemaRoot.create(schema, finalVectorsAllocator);
+      ) {
 
         // load it
         VectorLoader vectorLoader = new VectorLoader(newRoot);
@@ -131,8 +130,8 @@ public class TestVectorUnloadLoad {
       try (
           ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
           BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-          VectorSchemaRoot newRoot = new VectorSchemaRoot(schema, finalVectorsAllocator);
-          ) {
+          VectorSchemaRoot newRoot = VectorSchemaRoot.create(schema, finalVectorsAllocator);
+      ) {
         List<ArrowBuf> oldBuffers = recordBatch.getBuffers();
         List<ArrowBuf> newBuffers = new ArrayList<>();
         for (ArrowBuf oldBuffer : oldBuffers) {
@@ -185,7 +184,7 @@ public class TestVectorUnloadLoad {
     Schema schema = new Schema(asList(
         new Field("intDefined", true, new ArrowType.Int(32, true), Collections.<Field>emptyList()),
         new Field("intNull", true, new ArrowType.Int(32, true), Collections.<Field>emptyList())
-        ));
+                                     ));
     int count = 10;
     ArrowBuf validity = allocator.buffer(10).slice(0, 0);
     ArrowBuf[] values = new ArrowBuf[2];
@@ -200,8 +199,8 @@ public class TestVectorUnloadLoad {
     try (
         ArrowRecordBatch recordBatch = new ArrowRecordBatch(count, asList(new ArrowFieldNode(count, 0), new ArrowFieldNode(count, count)), asList(validity, values[0], validity, values[1]));
         BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-        VectorSchemaRoot newRoot = new VectorSchemaRoot(schema, finalVectorsAllocator);
-        ) {
+        VectorSchemaRoot newRoot = VectorSchemaRoot.create(schema, finalVectorsAllocator);
+    ) {
 
       // load it
       VectorLoader vectorLoader = new VectorLoader(newRoot);
@@ -244,11 +243,12 @@ public class TestVectorUnloadLoad {
     Schema schema = new Schema(root.getField().getChildren());
     int valueCount = root.getAccessor().getValueCount();
     List<FieldVector> fields = root.getChildrenFromFields();
-    return new VectorUnloader(schema, valueCount, fields);
+    VectorSchemaRoot vsr = new VectorSchemaRoot(schema.getFields(), fields, valueCount);
+    return new VectorUnloader(vsr);
   }
 
   @AfterClass
   public static void afterClass() {
     allocator.close();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java
index 58312b3..2b49d8e 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java
@@ -53,7 +53,7 @@ public class TestPromotableWriter {
   public void testPromoteToUnion() throws Exception {
 
     try (final MapVector container = new MapVector(EMPTY_SCHEMA_PATH, allocator, null);
-         final NullableMapVector v = container.addOrGet("test", MinorType.MAP, NullableMapVector.class);
+         final NullableMapVector v = container.addOrGet("test", MinorType.MAP, NullableMapVector.class, null);
          final PromotableWriter writer = new PromotableWriter(v, container)) {
 
       container.allocateNew();

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java
index 7a2d416..a8a2d51 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java
@@ -181,7 +181,7 @@ public class TestComplexWriter {
 
   @Test
   public void listScalarType() {
-    ListVector listVector = new ListVector("list", allocator, null);
+    ListVector listVector = new ListVector("list", allocator, null, null);
     listVector.allocateNew();
     UnionListWriter listWriter = new UnionListWriter(listVector);
     for (int i = 0; i < COUNT; i++) {
@@ -204,7 +204,7 @@ public class TestComplexWriter {
 
   @Test
   public void listScalarTypeNullable() {
-    ListVector listVector = new ListVector("list", allocator, null);
+    ListVector listVector = new ListVector("list", allocator, null, null);
     listVector.allocateNew();
     UnionListWriter listWriter = new UnionListWriter(listVector);
     for (int i = 0; i < COUNT; i++) {
@@ -233,7 +233,7 @@ public class TestComplexWriter {
 
   @Test
   public void listMapType() {
-    ListVector listVector = new ListVector("list", allocator, null);
+    ListVector listVector = new ListVector("list", allocator, null, null);
     listVector.allocateNew();
     UnionListWriter listWriter = new UnionListWriter(listVector);
     MapWriter mapWriter = listWriter.map();
@@ -261,7 +261,7 @@ public class TestComplexWriter {
 
   @Test
   public void listListType() {
-    try (ListVector listVector = new ListVector("list", allocator, null)) {
+    try (ListVector listVector = new ListVector("list", allocator, null, null)) {
       listVector.allocateNew();
       UnionListWriter listWriter = new UnionListWriter(listVector);
       for (int i = 0; i < COUNT; i++) {
@@ -286,7 +286,7 @@ public class TestComplexWriter {
    */
   @Test
   public void listListType2() {
-    try (ListVector listVector = new ListVector("list", allocator, null)) {
+    try (ListVector listVector = new ListVector("list", allocator, null, null)) {
       listVector.allocateNew();
       UnionListWriter listWriter = new UnionListWriter(listVector);
       ListWriter innerListWriter = listWriter.list();
@@ -324,7 +324,7 @@ public class TestComplexWriter {
 
   @Test
   public void unionListListType() {
-    try (ListVector listVector = new ListVector("list", allocator, null)) {
+    try (ListVector listVector = new ListVector("list", allocator, null, null)) {
       listVector.allocateNew();
       UnionListWriter listWriter = new UnionListWriter(listVector);
       for (int i = 0; i < COUNT; i++) {
@@ -353,7 +353,7 @@ public class TestComplexWriter {
    */
   @Test
   public void unionListListType2() {
-    try (ListVector listVector = new ListVector("list", allocator, null)) {
+    try (ListVector listVector = new ListVector("list", allocator, null, null)) {
       listVector.allocateNew();
       UnionListWriter listWriter = new UnionListWriter(listVector);
       ListWriter innerListWriter = listWriter.list();

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
index a83a283..75e5d2d 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
@@ -17,31 +17,44 @@
  */
 package org.apache.arrow.vector.file;
 
-import static org.apache.arrow.vector.TestVectorUnloadLoad.newVectorUnloader;
-import static org.junit.Assert.assertTrue;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
+
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.NullableTinyIntVector;
+import org.apache.arrow.vector.NullableVarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.complex.MapVector;
 import org.apache.arrow.vector.complex.NullableMapVector;
+import org.apache.arrow.vector.complex.impl.UnionListWriter;
+import org.apache.arrow.vector.dictionary.Dictionary;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider;
+import org.apache.arrow.vector.dictionary.DictionaryEncoder;
 import org.apache.arrow.vector.schema.ArrowBuffer;
+import org.apache.arrow.vector.schema.ArrowMessage;
 import org.apache.arrow.vector.schema.ArrowRecordBatch;
 import org.apache.arrow.vector.stream.ArrowStreamReader;
 import org.apache.arrow.vector.stream.ArrowStreamWriter;
+import org.apache.arrow.vector.stream.MessageSerializerTest;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.ArrowType.Int;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
+import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.Text;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -68,7 +81,7 @@ public class TestArrowFile extends BaseFileTest {
     int count = COUNT;
     try (
         BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
-        NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null)) {
+        NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null, null)) {
       writeComplexData(count, parent);
       FieldVector root = parent.getChild("root");
       validateComplexContent(count, new VectorSchemaRoot(root));
@@ -83,71 +96,63 @@ public class TestArrowFile extends BaseFileTest {
     int count = COUNT;
 
     // write
-    try (
-        BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
-        MapVector parent = new MapVector("parent", originalVectorAllocator, null)) {
+    try (BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
+         MapVector parent = new MapVector("parent", originalVectorAllocator, null)) {
       writeData(count, parent);
       write(parent.getChild("root"), file, stream);
     }
 
     // read
-    try (
-        BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
-        FileInputStream fileInputStream = new FileInputStream(file);
-        ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator);
-        BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-        MapVector parent = new MapVector("parent", vectorAllocator, null)
-        ) {
-      ArrowFooter footer = arrowReader.readFooter();
-      Schema schema = footer.getSchema();
-      LOGGER.debug("reading schema: " + schema);
-
-      // initialize vectors
-
-      try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) {
-        VectorLoader vectorLoader = new VectorLoader(root);
-
-        List<ArrowBlock> recordBatches = footer.getRecordBatches();
-        for (ArrowBlock rbBlock : recordBatches) {
-          try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
-            List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
-            for (ArrowBuffer arrowBuffer : buffersLayout) {
-              Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         FileInputStream fileInputStream = new FileInputStream(file);
+         ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator){
+            @Override
+            protected ArrowMessage readMessage(SeekableReadChannel in, BufferAllocator allocator) throws IOException {
+              ArrowMessage message = super.readMessage(in, allocator);
+              if (message != null) {
+                ArrowRecordBatch batch = (ArrowRecordBatch) message;
+                List<ArrowBuffer> buffersLayout = batch.getBuffersLayout();
+                for (ArrowBuffer arrowBuffer : buffersLayout) {
+                  Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
+                }
+              }
+              return message;
             }
-            vectorLoader.load(recordBatch);
-          }
-
-          validateContent(count, root);
-        }
+         }) {
+      Schema schema = arrowReader.getVectorSchemaRoot().getSchema();
+      LOGGER.debug("reading schema: " + schema);
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) {
+        arrowReader.loadRecordBatch(rbBlock);
+        Assert.assertEquals(count, root.getRowCount());
+        validateContent(count, root);
       }
     }
 
     // Read from stream.
-    try (
-        BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
-        ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
-        ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator);
-        BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-        MapVector parent = new MapVector("parent", vectorAllocator, null)
-        ) {
-      arrowReader.init();
-      Schema schema = arrowReader.getSchema();
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
+         ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator){
+           @Override
+           protected ArrowMessage readMessage(ReadChannel in, BufferAllocator allocator) throws IOException {
+             ArrowMessage message = super.readMessage(in, allocator);
+             if (message != null) {
+               ArrowRecordBatch batch = (ArrowRecordBatch) message;
+               List<ArrowBuffer> buffersLayout = batch.getBuffersLayout();
+               for (ArrowBuffer arrowBuffer : buffersLayout) {
+                 Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
+               }
+             }
+             return message;
+           }
+         }) {
+
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
       LOGGER.debug("reading schema: " + schema);
-
-      try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) {
-        VectorLoader vectorLoader = new VectorLoader(root);
-        while (true) {
-          try (ArrowRecordBatch recordBatch = arrowReader.nextRecordBatch()) {
-            if (recordBatch == null) break;
-            List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
-            for (ArrowBuffer arrowBuffer : buffersLayout) {
-              Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
-            }
-            vectorLoader.load(recordBatch);
-          }
-        }
-        validateContent(count, root);
-      }
+      arrowReader.loadNextBatch();
+      Assert.assertEquals(count, root.getRowCount());
+      validateContent(count, root);
     }
   }
 
@@ -158,61 +163,37 @@ public class TestArrowFile extends BaseFileTest {
     int count = COUNT;
 
     // write
-    try (
-        BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
-        MapVector parent = new MapVector("parent", originalVectorAllocator, null)) {
+    try (BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
+         MapVector parent = new MapVector("parent", originalVectorAllocator, null)) {
       writeComplexData(count, parent);
       write(parent.getChild("root"), file, stream);
     }
 
     // read
-    try (
-        BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
-        FileInputStream fileInputStream = new FileInputStream(file);
-        ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator);
-        BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-        NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null)
-        ) {
-      ArrowFooter footer = arrowReader.readFooter();
-      Schema schema = footer.getSchema();
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         FileInputStream fileInputStream = new FileInputStream(file);
+         ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
       LOGGER.debug("reading schema: " + schema);
 
-      // initialize vectors
-
-      try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) {
-        VectorLoader vectorLoader = new VectorLoader(root);
-        List<ArrowBlock> recordBatches = footer.getRecordBatches();
-        for (ArrowBlock rbBlock : recordBatches) {
-          try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
-            vectorLoader.load(recordBatch);
-          }
-          validateComplexContent(count, root);
-        }
+      for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) {
+        arrowReader.loadRecordBatch(rbBlock);
+        Assert.assertEquals(count, root.getRowCount());
+        validateComplexContent(count, root);
       }
     }
 
     // Read from stream.
-    try (
-        BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
-        ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
-        ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator);
-        BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-        MapVector parent = new MapVector("parent", vectorAllocator, null)
-        ) {
-      arrowReader.init();
-      Schema schema = arrowReader.getSchema();
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
+         ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
       LOGGER.debug("reading schema: " + schema);
-
-      try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) {
-        VectorLoader vectorLoader = new VectorLoader(root);
-        while (true) {
-          try (ArrowRecordBatch recordBatch = arrowReader.nextRecordBatch()) {
-            if (recordBatch == null) break;
-            vectorLoader.load(recordBatch);
-          }
-        }
-        validateComplexContent(count, root);
-      }
+      arrowReader.loadNextBatch();
+      Assert.assertEquals(count, root.getRowCount());
+      validateComplexContent(count, root);
     }
   }
 
@@ -223,94 +204,70 @@ public class TestArrowFile extends BaseFileTest {
     int[] counts = { 10, 5 };
 
     // write
-    try (
-        BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
-        MapVector parent = new MapVector("parent", originalVectorAllocator, null);
-        FileOutputStream fileOutputStream = new FileOutputStream(file);) {
+    try (BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
+         MapVector parent = new MapVector("parent", originalVectorAllocator, null);
+         FileOutputStream fileOutputStream = new FileOutputStream(file)){
       writeData(counts[0], parent);
-      VectorUnloader vectorUnloader0 = newVectorUnloader(parent.getChild("root"));
-      Schema schema = vectorUnloader0.getSchema();
-      Assert.assertEquals(2, schema.getFields().size());
-      try (ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
-          ArrowStreamWriter streamWriter = new ArrowStreamWriter(stream, schema)) {
-        try (ArrowRecordBatch recordBatch = vectorUnloader0.getRecordBatch()) {
-          Assert.assertEquals("RB #0", counts[0], recordBatch.getLength());
-          arrowWriter.writeRecordBatch(recordBatch);
-          streamWriter.writeRecordBatch(recordBatch);
-        }
+      VectorSchemaRoot root = new VectorSchemaRoot(parent.getChild("root"));
+
+      try(ArrowFileWriter fileWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel());
+          ArrowStreamWriter streamWriter = new ArrowStreamWriter(root, null, stream)) {
+        fileWriter.start();
+        streamWriter.start();
+
+        fileWriter.writeBatch();
+        streamWriter.writeBatch();
+
         parent.allocateNew();
         writeData(counts[1], parent); // if we write the same data we don't catch that the metadata is stored in the wrong order.
-        VectorUnloader vectorUnloader1 = newVectorUnloader(parent.getChild("root"));
-        try (ArrowRecordBatch recordBatch = vectorUnloader1.getRecordBatch()) {
-          Assert.assertEquals("RB #1", counts[1], recordBatch.getLength());
-          arrowWriter.writeRecordBatch(recordBatch);
-          streamWriter.writeRecordBatch(recordBatch);
-        }
+        root.setRowCount(counts[1]);
+
+        fileWriter.writeBatch();
+        streamWriter.writeBatch();
+
+        fileWriter.end();
+        streamWriter.end();
       }
     }
 
-    // read
-    try (
-        BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
-        FileInputStream fileInputStream = new FileInputStream(file);
-        ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator);
-        BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-        MapVector parent = new MapVector("parent", vectorAllocator, null);
-        ) {
-      ArrowFooter footer = arrowReader.readFooter();
-      Schema schema = footer.getSchema();
+    // read file
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         FileInputStream fileInputStream = new FileInputStream(file);
+         ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
       LOGGER.debug("reading schema: " + schema);
       int i = 0;
-      try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator);) {
-        VectorLoader vectorLoader = new VectorLoader(root);
-        List<ArrowBlock> recordBatches = footer.getRecordBatches();
-        Assert.assertEquals(2, recordBatches.size());
-        long previousOffset = 0;
-        for (ArrowBlock rbBlock : recordBatches) {
-          Assert.assertTrue(rbBlock.getOffset() + " > " + previousOffset, rbBlock.getOffset() > previousOffset);
-          previousOffset = rbBlock.getOffset();
-          try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
-            Assert.assertEquals("RB #" + i, counts[i], recordBatch.getLength());
-            List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
-            for (ArrowBuffer arrowBuffer : buffersLayout) {
-              Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
-            }
-            vectorLoader.load(recordBatch);
-            validateContent(counts[i], root);
-          }
-          ++i;
-        }
+      List<ArrowBlock> recordBatches = arrowReader.getRecordBlocks();
+      Assert.assertEquals(2, recordBatches.size());
+      long previousOffset = 0;
+      for (ArrowBlock rbBlock : recordBatches) {
+        Assert.assertTrue(rbBlock.getOffset() + " > " + previousOffset, rbBlock.getOffset() > previousOffset);
+        previousOffset = rbBlock.getOffset();
+        arrowReader.loadRecordBatch(rbBlock);
+        Assert.assertEquals("RB #" + i, counts[i], root.getRowCount());
+        validateContent(counts[i], root);
+        ++i;
       }
     }
 
     // read stream
-    try (
-        BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
-        ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
-        ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator);
-        BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-        MapVector parent = new MapVector("parent", vectorAllocator, null)
-        ) {
-      arrowReader.init();
-      Schema schema = arrowReader.getSchema();
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
+         ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
       LOGGER.debug("reading schema: " + schema);
       int i = 0;
-      try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator);) {
-        VectorLoader vectorLoader = new VectorLoader(root);
-        for (int n = 0; n < 2; n++) {
-          try (ArrowRecordBatch recordBatch = arrowReader.nextRecordBatch()) {
-            assertTrue(recordBatch != null);
-            Assert.assertEquals("RB #" + i, counts[i], recordBatch.getLength());
-            List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
-            for (ArrowBuffer arrowBuffer : buffersLayout) {
-              Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
-            }
-            vectorLoader.load(recordBatch);
-            validateContent(counts[i], root);
-          }
-          ++i;
-        }
+
+      for (int n = 0; n < 2; n++) {
+        arrowReader.loadNextBatch();
+        Assert.assertEquals("RB #" + i, counts[i], root.getRowCount());
+        validateContent(counts[i], root);
+        ++i;
       }
+      arrowReader.loadNextBatch();
+      Assert.assertEquals(0, root.getRowCount());
     }
   }
 
@@ -319,90 +276,326 @@ public class TestArrowFile extends BaseFileTest {
     File file = new File("target/mytest_write_union.arrow");
     ByteArrayOutputStream stream = new ByteArrayOutputStream();
     int count = COUNT;
-    try (
-        BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
-        NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null)) {
 
+    // write
+    try (BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
+         NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null, null)) {
       writeUnionData(count, parent);
-
-      printVectors(parent.getChildrenFromFields());
-
       validateUnionData(count, new VectorSchemaRoot(parent.getChild("root")));
-
       write(parent.getChild("root"), file, stream);
     }
-    // read
-    try (
-        BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
-        FileInputStream fileInputStream = new FileInputStream(file);
-        ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator);
-        BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-        ) {
-      ArrowFooter footer = arrowReader.readFooter();
-      Schema schema = footer.getSchema();
+
+    // read file
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         FileInputStream fileInputStream = new FileInputStream(file);
+         ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
       LOGGER.debug("reading schema: " + schema);
+      arrowReader.loadNextBatch();
+      validateUnionData(count, root);
+    }
+
+    // Read from stream.
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
+         ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
+      LOGGER.debug("reading schema: " + schema);
+      arrowReader.loadNextBatch();
+      validateUnionData(count, root);
+    }
+  }
 
-      // initialize vectors
-      try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator);) {
-        VectorLoader vectorLoader = new VectorLoader(root);
-        List<ArrowBlock> recordBatches = footer.getRecordBatches();
-        for (ArrowBlock rbBlock : recordBatches) {
-          try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
-            vectorLoader.load(recordBatch);
-          }
-          validateUnionData(count, root);
-        }
+  @Test
+  public void testWriteReadTiny() throws IOException {
+    File file = new File("target/mytest_write_tiny.arrow");
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+
+    try (VectorSchemaRoot root = VectorSchemaRoot.create(MessageSerializerTest.testSchema(), allocator)) {
+      root.getFieldVectors().get(0).allocateNew();
+      NullableTinyIntVector.Mutator mutator = (NullableTinyIntVector.Mutator) root.getFieldVectors().get(0).getMutator();
+      for (int i = 0; i < 16; i++) {
+        mutator.set(i, i < 8 ? 1 : 0, (byte)(i + 1));
+      }
+      mutator.setValueCount(16);
+      root.setRowCount(16);
+
+      // write file
+      try (FileOutputStream fileOutputStream = new FileOutputStream(file);
+           ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel())) {
+        LOGGER.debug("writing schema: " + root.getSchema());
+        arrowWriter.start();
+        arrowWriter.writeBatch();
+        arrowWriter.end();
+      }
+      // write stream
+      try (ArrowStreamWriter arrowWriter = new ArrowStreamWriter(root, null, stream)) {
+        arrowWriter.start();
+        arrowWriter.writeBatch();
+        arrowWriter.end();
       }
     }
 
+    // read file
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("fileReader", 0, Integer.MAX_VALUE);
+         FileInputStream fileInputStream = new FileInputStream(file);
+         ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
+      LOGGER.debug("reading schema: " + schema);
+      arrowReader.loadNextBatch();
+      validateTinyData(root);
+    }
+
     // Read from stream.
-    try (
-        BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
-        ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
-        ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator);
-        BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-        MapVector parent = new MapVector("parent", vectorAllocator, null)
-        ) {
-      arrowReader.init();
-      Schema schema = arrowReader.getSchema();
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("streamReader", 0, Integer.MAX_VALUE);
+         ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
+         ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
+      LOGGER.debug("reading schema: " + schema);
+      arrowReader.loadNextBatch();
+      validateTinyData(root);
+    }
+  }
+
+  private void validateTinyData(VectorSchemaRoot root) {
+    Assert.assertEquals(16, root.getRowCount());
+    NullableTinyIntVector vector = (NullableTinyIntVector) root.getFieldVectors().get(0);
+    for (int i = 0; i < 16; i++) {
+      if (i < 8) {
+        Assert.assertEquals((byte)(i + 1), vector.getAccessor().get(i));
+      } else {
+        Assert.assertTrue(vector.getAccessor().isNull(i));
+      }
+    }
+  }
+
+  @Test
+  public void testWriteReadDictionary() throws IOException {
+    File file = new File("target/mytest_dict.arrow");
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+
+    // write
+    try (BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
+         NullableVarCharVector vector = new NullableVarCharVector("varchar", originalVectorAllocator, null);
+         NullableVarCharVector dictionaryVector = new NullableVarCharVector("dict", originalVectorAllocator, null)) {
+      vector.allocateNewSafe();
+      NullableVarCharVector.Mutator mutator = vector.getMutator();
+      mutator.set(0, "foo".getBytes(StandardCharsets.UTF_8));
+      mutator.set(1, "bar".getBytes(StandardCharsets.UTF_8));
+      mutator.set(3, "baz".getBytes(StandardCharsets.UTF_8));
+      mutator.set(4, "bar".getBytes(StandardCharsets.UTF_8));
+      mutator.set(5, "baz".getBytes(StandardCharsets.UTF_8));
+      mutator.setValueCount(6);
+
+      dictionaryVector.allocateNewSafe();
+      mutator = dictionaryVector.getMutator();
+      mutator.set(0, "foo".getBytes(StandardCharsets.UTF_8));
+      mutator.set(1, "bar".getBytes(StandardCharsets.UTF_8));
+      mutator.set(2, "baz".getBytes(StandardCharsets.UTF_8));
+      mutator.setValueCount(3);
+
+      Dictionary dictionary = new Dictionary(dictionaryVector, new DictionaryEncoding(1L, false, null));
+      MapDictionaryProvider provider = new MapDictionaryProvider();
+      provider.put(dictionary);
+
+      FieldVector encodedVector = (FieldVector) DictionaryEncoder.encode(vector, dictionary);
+
+      List<Field> fields = ImmutableList.of(encodedVector.getField());
+      List<FieldVector> vectors = ImmutableList.of(encodedVector);
+      VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 6);
+
+      try (FileOutputStream fileOutputStream = new FileOutputStream(file);
+           ArrowFileWriter fileWriter = new ArrowFileWriter(root, provider, fileOutputStream.getChannel());
+           ArrowStreamWriter streamWriter = new ArrowStreamWriter(root, provider, stream)) {
+        LOGGER.debug("writing schema: " + root.getSchema());
+        fileWriter.start();
+        streamWriter.start();
+        fileWriter.writeBatch();
+        streamWriter.writeBatch();
+        fileWriter.end();
+        streamWriter.end();
+      }
+
+      dictionaryVector.close();
+      encodedVector.close();
+    }
+
+    // read from file
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         FileInputStream fileInputStream = new FileInputStream(file);
+         ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
       LOGGER.debug("reading schema: " + schema);
+      arrowReader.loadNextBatch();
+      validateFlatDictionary(root.getFieldVectors().get(0), arrowReader);
+    }
+
+    // Read from stream
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
+         ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
+      LOGGER.debug("reading schema: " + schema);
+      arrowReader.loadNextBatch();
+      validateFlatDictionary(root.getFieldVectors().get(0), arrowReader);
+    }
+  }
+
+  private void validateFlatDictionary(FieldVector vector, DictionaryProvider provider) {
+    Assert.assertNotNull(vector);
+
+    DictionaryEncoding encoding = vector.getField().getDictionary();
+    Assert.assertNotNull(encoding);
+    Assert.assertEquals(1L, encoding.getId());
+
+    FieldVector.Accessor accessor = vector.getAccessor();
+    Assert.assertEquals(6, accessor.getValueCount());
+    Assert.assertEquals(0, accessor.getObject(0));
+    Assert.assertEquals(1, accessor.getObject(1));
+    Assert.assertEquals(null, accessor.getObject(2));
+    Assert.assertEquals(2, accessor.getObject(3));
+    Assert.assertEquals(1, accessor.getObject(4));
+    Assert.assertEquals(2, accessor.getObject(5));
+
+    Dictionary dictionary = provider.lookup(1L);
+    Assert.assertNotNull(dictionary);
+    NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) dictionary.getVector()).getAccessor();
+    Assert.assertEquals(3, dictionaryAccessor.getValueCount());
+    Assert.assertEquals(new Text("foo"), dictionaryAccessor.getObject(0));
+    Assert.assertEquals(new Text("bar"), dictionaryAccessor.getObject(1));
+    Assert.assertEquals(new Text("baz"), dictionaryAccessor.getObject(2));
+  }
 
-      try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) {
-        VectorLoader vectorLoader = new VectorLoader(root);
-        while (true) {
-          try (ArrowRecordBatch recordBatch = arrowReader.nextRecordBatch()) {
-            if (recordBatch == null) break;
-            vectorLoader.load(recordBatch);
-          }
-        }
-        validateUnionData(count, root);
+  @Test
+  public void testWriteReadNestedDictionary() throws IOException {
+    File file = new File("target/mytest_dict_nested.arrow");
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+
+    DictionaryEncoding encoding = new DictionaryEncoding(2L, false, null);
+
+    // data being written:
+    // [['foo', 'bar'], ['foo'], ['bar']] -> [[0, 1], [0], [1]]
+
+    // write
+    try (NullableVarCharVector dictionaryVector = new NullableVarCharVector("dictionary", allocator, null);
+         ListVector listVector = new ListVector("list", allocator, null, null)) {
+
+      Dictionary dictionary = new Dictionary(dictionaryVector, encoding);
+      MapDictionaryProvider provider = new MapDictionaryProvider();
+      provider.put(dictionary);
+
+      dictionaryVector.allocateNew();
+      dictionaryVector.getMutator().set(0, "foo".getBytes(StandardCharsets.UTF_8));
+      dictionaryVector.getMutator().set(1, "bar".getBytes(StandardCharsets.UTF_8));
+      dictionaryVector.getMutator().setValueCount(2);
+
+      listVector.addOrGetVector(MinorType.INT, encoding);
+      listVector.allocateNew();
+      UnionListWriter listWriter = new UnionListWriter(listVector);
+      listWriter.startList();
+      listWriter.writeInt(0);
+      listWriter.writeInt(1);
+      listWriter.endList();
+      listWriter.startList();
+      listWriter.writeInt(0);
+      listWriter.endList();
+      listWriter.startList();
+      listWriter.writeInt(1);
+      listWriter.endList();
+      listWriter.setValueCount(3);
+
+      List<Field> fields = ImmutableList.of(listVector.getField());
+      List<FieldVector> vectors = ImmutableList.of((FieldVector) listVector);
+      VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 3);
+
+      try (
+           FileOutputStream fileOutputStream = new FileOutputStream(file);
+           ArrowFileWriter fileWriter = new ArrowFileWriter(root, provider, fileOutputStream.getChannel());
+           ArrowStreamWriter streamWriter = new ArrowStreamWriter(root, provider, stream)) {
+        LOGGER.debug("writing schema: " + root.getSchema());
+        fileWriter.start();
+        streamWriter.start();
+        fileWriter.writeBatch();
+        streamWriter.writeBatch();
+        fileWriter.end();
+        streamWriter.end();
       }
     }
+
+    // read from file
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         FileInputStream fileInputStream = new FileInputStream(file);
+         ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
+      LOGGER.debug("reading schema: " + schema);
+      arrowReader.loadNextBatch();
+      validateNestedDictionary((ListVector) root.getFieldVectors().get(0), arrowReader);
+    }
+
+    // Read from stream
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
+         ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
+      LOGGER.debug("reading schema: " + schema);
+      arrowReader.loadNextBatch();
+      validateNestedDictionary((ListVector) root.getFieldVectors().get(0), arrowReader);
+    }
+  }
+
+  private void validateNestedDictionary(ListVector vector, DictionaryProvider provider) {
+    Assert.assertNotNull(vector);
+    Assert.assertNull(vector.getField().getDictionary());
+    Field nestedField = vector.getField().getChildren().get(0);
+
+    DictionaryEncoding encoding = nestedField.getDictionary();
+    Assert.assertNotNull(encoding);
+    Assert.assertEquals(2L, encoding.getId());
+    Assert.assertEquals(new Int(32, true), encoding.getIndexType());
+
+    ListVector.Accessor accessor = vector.getAccessor();
+    Assert.assertEquals(3, accessor.getValueCount());
+    Assert.assertEquals(Arrays.asList(0, 1), accessor.getObject(0));
+    Assert.assertEquals(Arrays.asList(0), accessor.getObject(1));
+    Assert.assertEquals(Arrays.asList(1), accessor.getObject(2));
+
+    Dictionary dictionary = provider.lookup(2L);
+    Assert.assertNotNull(dictionary);
+    NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) dictionary.getVector()).getAccessor();
+    Assert.assertEquals(2, dictionaryAccessor.getValueCount());
+    Assert.assertEquals(new Text("foo"), dictionaryAccessor.getObject(0));
+    Assert.assertEquals(new Text("bar"), dictionaryAccessor.getObject(1));
   }
 
   /**
    * Writes the contents of parents to file. If outStream is non-null, also writes it
    * to outStream in the streaming serialized format.
    */
-  private void write(FieldVector parent, File file, OutputStream outStream) throws FileNotFoundException, IOException {
-    VectorUnloader vectorUnloader = newVectorUnloader(parent);
-    Schema schema = vectorUnloader.getSchema();
-    LOGGER.debug("writing schema: " + schema);
-    try (
-        FileOutputStream fileOutputStream = new FileOutputStream(file);
-        ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
-        ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
-            ) {
-      arrowWriter.writeRecordBatch(recordBatch);
+  private void write(FieldVector parent, File file, OutputStream outStream) throws IOException {
+    VectorSchemaRoot root = new VectorSchemaRoot(parent);
+
+    try (FileOutputStream fileOutputStream = new FileOutputStream(file);
+         ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel());) {
+      LOGGER.debug("writing schema: " + root.getSchema());
+      arrowWriter.start();
+      arrowWriter.writeBatch();
+      arrowWriter.end();
     }
 
     // Also try serializing to the stream writer.
     if (outStream != null) {
-      try (
-          ArrowStreamWriter arrowWriter = new ArrowStreamWriter(outStream, schema);
-          ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
-          ) {
-        arrowWriter.writeRecordBatch(recordBatch);
+      try (ArrowStreamWriter arrowWriter = new ArrowStreamWriter(root, null, outStream)) {
+        arrowWriter.start();
+        arrowWriter.writeBatch();
+        arrowWriter.end();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
index 13b04de..914dfe4 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
@@ -17,12 +17,15 @@
  */
 package org.apache.arrow.vector.file;
 
+import static java.nio.channels.Channels.newChannel;
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
@@ -34,8 +37,14 @@ import org.apache.arrow.flatbuf.Message;
 import org.apache.arrow.flatbuf.RecordBatch;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.NullableIntVector;
+import org.apache.arrow.vector.NullableTinyIntVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.schema.ArrowFieldNode;
 import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
@@ -69,12 +78,17 @@ public class TestArrowReaderWriter {
   @Test
   public void test() throws IOException {
     Schema schema = new Schema(asList(new Field("testField", true, new ArrowType.Int(8, true), Collections.<Field>emptyList())));
-    byte[] validity = new byte[] { (byte)255, 0};
+    MinorType minorType = Types.getMinorTypeForArrowType(schema.getFields().get(0).getType());
+    FieldVector vector = minorType.getNewVector("testField", allocator, null,null);
+    vector.initializeChildrenFromFields(schema.getFields().get(0).getChildren());
+
+    byte[] validity = new byte[] { (byte) 255, 0};
     // second half is "undefined"
     byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
 
     ByteArrayOutputStream out = new ByteArrayOutputStream();
-    try (ArrowWriter writer = new ArrowWriter(Channels.newChannel(out), schema)) {
+    try (VectorSchemaRoot root = new VectorSchemaRoot(schema.getFields(), asList(vector), 16);
+         ArrowFileWriter writer = new ArrowFileWriter(root, null, newChannel(out))) {
       ArrowBuf validityb = buf(validity);
       ArrowBuf valuesb =  buf(values);
       writer.writeRecordBatch(new ArrowRecordBatch(16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb)));
@@ -82,15 +96,15 @@ public class TestArrowReaderWriter {
 
     byte[] byteArray = out.toByteArray();
 
-    try (ArrowReader reader = new ArrowReader(new ByteArrayReadableSeekableByteChannel(byteArray), allocator)) {
-      ArrowFooter footer = reader.readFooter();
-      Schema readSchema = footer.getSchema();
+    SeekableReadChannel channel = new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel(byteArray));
+    try (ArrowFileReader reader = new ArrowFileReader(channel, allocator)) {
+      Schema readSchema = reader.getVectorSchemaRoot().getSchema();
       assertEquals(schema, readSchema);
       assertTrue(readSchema.getFields().get(0).getTypeLayout().getVectorTypes().toString(), readSchema.getFields().get(0).getTypeLayout().getVectors().size() > 0);
       // TODO: dictionaries
-      List<ArrowBlock> recordBatches = footer.getRecordBatches();
+      List<ArrowBlock> recordBatches = reader.getRecordBlocks();
       assertEquals(1, recordBatches.size());
-      ArrowRecordBatch recordBatch = reader.readRecordBatch(recordBatches.get(0));
+      ArrowRecordBatch recordBatch = (ArrowRecordBatch) reader.readMessage(channel, allocator);
       List<ArrowFieldNode> nodes = recordBatch.getNodes();
       assertEquals(1, nodes.size());
       ArrowFieldNode node = nodes.get(0);