You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/03/16 18:00:00 UTC

[1/4] arrow git commit: ARROW-542: Adding dictionary encoding to FileWriter

Repository: arrow
Updated Branches:
  refs/heads/master 3b650014f -> 49f666e74


http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java
new file mode 100644
index 0000000..e7cdf3f
--- /dev/null
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import io.netty.buffer.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.NullableTinyIntVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.schema.ArrowMessage;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.stream.ArrowStreamReader;
+import org.apache.arrow.vector.stream.ArrowStreamWriter;
+import org.apache.arrow.vector.stream.MessageSerializerTest;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.Test;
+
+public class TestArrowStream extends BaseFileTest {
+  @Test
+  public void testEmptyStream() throws IOException {
+    Schema schema = MessageSerializerTest.testSchema();
+    VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
+
+    // Write the stream.
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    try (ArrowStreamWriter writer = new ArrowStreamWriter(root, null, out)) {
+    }
+
+    ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+    try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) {
+      assertEquals(schema, reader.getVectorSchemaRoot().getSchema());
+      // Empty should return nothing. Can be called repeatedly.
+      reader.loadNextBatch();
+      assertEquals(0, reader.getVectorSchemaRoot().getRowCount());
+      reader.loadNextBatch();
+      assertEquals(0, reader.getVectorSchemaRoot().getRowCount());
+    }
+  }
+
+  @Test
+  public void testReadWrite() throws IOException {
+    Schema schema = MessageSerializerTest.testSchema();
+    try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      int numBatches = 1;
+
+      root.getFieldVectors().get(0).allocateNew();
+      NullableTinyIntVector.Mutator mutator = (NullableTinyIntVector.Mutator) root.getFieldVectors().get(0).getMutator();
+      for (int i = 0; i < 16; i++) {
+        mutator.set(i, i < 8 ? 1 : 0, (byte)(i + 1));
+      }
+      mutator.setValueCount(16);
+      root.setRowCount(16);
+
+      ByteArrayOutputStream out = new ByteArrayOutputStream();
+      long bytesWritten = 0;
+      try (ArrowStreamWriter writer = new ArrowStreamWriter(root, null, out)) {
+        writer.start();
+        for (int i = 0; i < numBatches; i++) {
+          writer.writeBatch();
+        }
+        writer.end();
+        bytesWritten = writer.bytesWritten();
+      }
+
+      ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+      try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) {
+        Schema readSchema = reader.getVectorSchemaRoot().getSchema();
+        assertEquals(schema, readSchema);
+        for (int i = 0; i < numBatches; i++) {
+          reader.loadNextBatch();
+        }
+        // TODO figure out why reader isn't getting padding bytes
+        assertEquals(bytesWritten, reader.bytesRead() + 4);
+        reader.loadNextBatch();
+        assertEquals(0, reader.getVectorSchemaRoot().getRowCount());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java
new file mode 100644
index 0000000..46d4679
--- /dev/null
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.channels.Pipe;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.NullableTinyIntVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.schema.ArrowMessage;
+import org.apache.arrow.vector.stream.ArrowStreamReader;
+import org.apache.arrow.vector.stream.ArrowStreamWriter;
+import org.apache.arrow.vector.stream.MessageSerializerTest;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestArrowStreamPipe {
+  Schema schema = MessageSerializerTest.testSchema();
+  BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
+
+  private final class WriterThread extends Thread {
+
+    private final int numBatches;
+    private final ArrowStreamWriter writer;
+    private final VectorSchemaRoot root;
+
+    public WriterThread(int numBatches, WritableByteChannel sinkChannel)
+        throws IOException {
+      this.numBatches = numBatches;
+      BufferAllocator allocator = alloc.newChildAllocator("writer thread", 0, Integer.MAX_VALUE);
+      root = VectorSchemaRoot.create(schema, allocator);
+      writer = new ArrowStreamWriter(root, null, sinkChannel);
+    }
+
+    @Override
+    public void run() {
+      try {
+        writer.start();
+        for (int j = 0; j < numBatches; j++) {
+          root.getFieldVectors().get(0).allocateNew();
+          NullableTinyIntVector.Mutator mutator = (NullableTinyIntVector.Mutator) root.getFieldVectors().get(0).getMutator();
+          // Send a changing batch id first
+          mutator.set(0, j);
+          for (int i = 1; i < 16; i++) {
+            mutator.set(i, i < 8 ? 1 : 0, (byte)(i + 1));
+          }
+          mutator.setValueCount(16);
+          root.setRowCount(16);
+
+          writer.writeBatch();
+        }
+        writer.close();
+        root.close();
+      } catch (IOException e) {
+        e.printStackTrace();
+        Assert.fail(e.toString()); // have to explicitly fail since we're in a separate thread
+      }
+    }
+
+    public long bytesWritten() { return writer.bytesWritten(); }
+  }
+
+  private final class ReaderThread extends Thread {
+    private int batchesRead = 0;
+    private final ArrowStreamReader reader;
+    private final BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
+    private boolean done = false;
+
+    public ReaderThread(ReadableByteChannel sourceChannel)
+        throws IOException {
+      reader = new ArrowStreamReader(sourceChannel, alloc) {
+        @Override
+        protected ArrowMessage readMessage(ReadChannel in, BufferAllocator allocator) throws IOException {
+          // Read all the batches. Each batch contains an incrementing id and then some
+          // constant data. Verify both.
+          ArrowMessage message = super.readMessage(in, allocator);
+          if (message == null) {
+            done = true;
+          } else {
+            batchesRead++;
+          }
+          return message;
+        }
+        @Override
+        public void loadNextBatch() throws IOException {
+          super.loadNextBatch();
+          if (!done) {
+            VectorSchemaRoot root = getVectorSchemaRoot();
+            Assert.assertEquals(16, root.getRowCount());
+            NullableTinyIntVector vector = (NullableTinyIntVector) root.getFieldVectors().get(0);
+            Assert.assertEquals((byte)(batchesRead - 1), vector.getAccessor().get(0));
+            for (int i = 1; i < 16; i++) {
+              if (i < 8) {
+                Assert.assertEquals((byte)(i + 1), vector.getAccessor().get(i));
+              } else {
+                Assert.assertTrue(vector.getAccessor().isNull(i));
+              }
+            }
+          }
+        }
+      };
+    }
+
+    @Override
+    public void run() {
+      try {
+        assertEquals(schema, reader.getVectorSchemaRoot().getSchema());
+        assertTrue(
+            reader.getVectorSchemaRoot().getSchema().getFields().get(0).getTypeLayout().getVectorTypes().toString(),
+            reader.getVectorSchemaRoot().getSchema().getFields().get(0).getTypeLayout().getVectors().size() > 0);
+        while (!done) {
+          reader.loadNextBatch();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        Assert.fail(e.toString()); // have to explicitly fail since we're in a separate thread
+      }
+    }
+
+    public int getBatchesRead() { return batchesRead; }
+    public long bytesRead() { return reader.bytesRead(); }
+  }
+
+  // Starts up a producer and consumer thread to read/write batches.
+  @Test
+  public void pipeTest() throws IOException, InterruptedException {
+    int NUM_BATCHES = 10;
+    Pipe pipe = Pipe.open();
+    WriterThread writer = new WriterThread(NUM_BATCHES, pipe.sink());
+    ReaderThread reader = new ReaderThread(pipe.source());
+
+    writer.start();
+    reader.start();
+    reader.join();
+    writer.join();
+
+    assertEquals(NUM_BATCHES, reader.getBatchesRead());
+    assertEquals(writer.bytesWritten(), reader.bytesRead());
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java b/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java
index 3720a13..c88958c 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java
@@ -70,7 +70,7 @@ public class TestJSONFile extends BaseFileTest {
     int count = COUNT;
     try (
         BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
-        NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null)) {
+        NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null, null)) {
       writeComplexData(count, parent);
       VectorSchemaRoot root = new VectorSchemaRoot(parent.getChild("root"));
       validateComplexContent(root.getRowCount(), root);
@@ -92,7 +92,7 @@ public class TestJSONFile extends BaseFileTest {
     int count = COUNT;
     try (
         BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
-        NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null)) {
+        NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null, null)) {
 
       writeUnionData(count, parent);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java b/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java
index 7b4de80..bb2ccf8 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java
@@ -34,6 +34,7 @@ import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.file.ReadChannel;
 import org.apache.arrow.vector.file.WriteChannel;
 import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.schema.ArrowMessage;
 import org.apache.arrow.vector.schema.ArrowRecordBatch;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
@@ -88,9 +89,10 @@ public class MessageSerializerTest {
     MessageSerializer.serialize(new WriteChannel(Channels.newChannel(out)), batch);
 
     ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
-    ArrowRecordBatch deserialized = MessageSerializer.deserializeRecordBatch(
-        new ReadChannel(Channels.newChannel(in)), alloc);
-    verifyBatch(deserialized, validity, values);
+    ReadChannel channel = new ReadChannel(Channels.newChannel(in));
+    ArrowMessage deserialized = MessageSerializer.deserializeMessageBatch(channel, alloc);
+    assertEquals(ArrowRecordBatch.class, deserialized.getClass());
+    verifyBatch((ArrowRecordBatch) deserialized, validity, values);
   }
 
   public static Schema testSchema() {

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java b/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java
deleted file mode 100644
index 725272a..0000000
--- a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.arrow.vector.stream;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.file.BaseFileTest;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.types.pojo.Schema;
-import org.junit.Test;
-
-import io.netty.buffer.ArrowBuf;
-
-public class TestArrowStream extends BaseFileTest {
-  @Test
-  public void testEmptyStream() throws IOException {
-    Schema schema = MessageSerializerTest.testSchema();
-
-    // Write the stream.
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    try (ArrowStreamWriter writer = new ArrowStreamWriter(out, schema)) {
-    }
-
-    ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
-    try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) {
-      reader.init();
-      assertEquals(schema, reader.getSchema());
-      // Empty should return null. Can be called repeatedly.
-      assertTrue(reader.nextRecordBatch() == null);
-      assertTrue(reader.nextRecordBatch() == null);
-    }
-  }
-
-  @Test
-  public void testReadWrite() throws IOException {
-    Schema schema = MessageSerializerTest.testSchema();
-    byte[] validity = new byte[] { (byte)255, 0};
-    // second half is "undefined"
-    byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
-
-    int numBatches = 5;
-    BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    long bytesWritten = 0;
-    try (ArrowStreamWriter writer = new ArrowStreamWriter(out, schema)) {
-      ArrowBuf validityb = MessageSerializerTest.buf(alloc, validity);
-      ArrowBuf valuesb =  MessageSerializerTest.buf(alloc, values);
-      for (int i = 0; i < numBatches; i++) {
-        writer.writeRecordBatch(new ArrowRecordBatch(
-            16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb)));
-      }
-      bytesWritten = writer.bytesWritten();
-    }
-
-    ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
-    try (ArrowStreamReader reader = new ArrowStreamReader(in, alloc)) {
-      reader.init();
-      Schema readSchema = reader.getSchema();
-      for (int i = 0; i < numBatches; i++) {
-        assertEquals(schema, readSchema);
-        assertTrue(
-            readSchema.getFields().get(0).getTypeLayout().getVectorTypes().toString(),
-            readSchema.getFields().get(0).getTypeLayout().getVectors().size() > 0);
-        ArrowRecordBatch recordBatch = reader.nextRecordBatch();
-        MessageSerializerTest.verifyBatch(recordBatch, validity, values);
-        assertTrue(recordBatch != null);
-      }
-      assertTrue(reader.nextRecordBatch() == null);
-      assertEquals(bytesWritten, reader.bytesRead());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java b/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
deleted file mode 100644
index aa0b77e..0000000
--- a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.arrow.vector.stream;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.nio.channels.Pipe;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.types.pojo.Schema;
-import org.junit.Test;
-
-import io.netty.buffer.ArrowBuf;
-
-public class TestArrowStreamPipe {
-  Schema schema = MessageSerializerTest.testSchema();
-  // second half is "undefined"
-  byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
-
-  private final class WriterThread extends Thread {
-    private final int numBatches;
-    private final ArrowStreamWriter writer;
-
-    public WriterThread(int numBatches, WritableByteChannel sinkChannel)
-        throws IOException {
-      this.numBatches = numBatches;
-      writer = new ArrowStreamWriter(sinkChannel, schema);
-    }
-
-    @Override
-    public void run() {
-      BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
-      try {
-        ArrowBuf valuesb =  MessageSerializerTest.buf(alloc, values);
-        for (int i = 0; i < numBatches; i++) {
-          // Send a changing byte id first.
-          byte[] validity = new byte[] { (byte)i, 0};
-          ArrowBuf validityb = MessageSerializerTest.buf(alloc, validity);
-          writer.writeRecordBatch(new ArrowRecordBatch(
-              16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb)));
-        }
-        writer.close();
-      } catch (IOException e) {
-        e.printStackTrace();
-        assertTrue(false);
-      }
-    }
-
-    public long bytesWritten() { return writer.bytesWritten(); }
-  }
-
-  private final class ReaderThread extends Thread {
-    private int batchesRead = 0;
-    private final ArrowStreamReader reader;
-    private final BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
-
-    public ReaderThread(ReadableByteChannel sourceChannel)
-        throws IOException {
-      reader = new ArrowStreamReader(sourceChannel, alloc);
-    }
-
-    @Override
-    public void run() {
-      try {
-        reader.init();
-        assertEquals(schema, reader.getSchema());
-        assertTrue(
-            reader.getSchema().getFields().get(0).getTypeLayout().getVectorTypes().toString(),
-            reader.getSchema().getFields().get(0).getTypeLayout().getVectors().size() > 0);
-
-        // Read all the batches. Each batch contains an incrementing id and then some
-        // constant data. Verify both.
-        while (true) {
-          ArrowRecordBatch batch = reader.nextRecordBatch();
-          if (batch == null) break;
-          byte[] validity = new byte[] { (byte)batchesRead, 0};
-          MessageSerializerTest.verifyBatch(batch, validity, values);
-          batchesRead++;
-        }
-      } catch (IOException e) {
-        e.printStackTrace();
-        assertTrue(false);
-      }
-    }
-
-    public int getBatchesRead() { return batchesRead; }
-    public long bytesRead() { return reader.bytesRead(); }
-  }
-
-  // Starts up a producer and consumer thread to read/write batches.
-  @Test
-  public void pipeTest() throws IOException, InterruptedException {
-    int NUM_BATCHES = 10;
-    Pipe pipe = Pipe.open();
-    WriterThread writer = new WriterThread(NUM_BATCHES, pipe.sink());
-    ReaderThread reader = new ReaderThread(pipe.source());
-
-    writer.start();
-    reader.start();
-    reader.join();
-    writer.join();
-
-    assertEquals(NUM_BATCHES, reader.getBatchesRead());
-    assertEquals(writer.bytesWritten(), reader.bytesRead());
-  }
-}


[4/4] arrow git commit: ARROW-542: Adding dictionary encoding to FileWriter

Posted by we...@apache.org.
ARROW-542: Adding dictionary encoding to FileWriter

WIP for comments

Author: Emilio Lahr-Vivaz <el...@ccri.com>
Author: Wes McKinney <we...@twosigma.com>

Closes #334 from elahrvivaz/ARROW-542 and squashes the following commits:

5339730 [Emilio Lahr-Vivaz] fixing bitvector load of value count, adding struct integration test
00d78d3 [Emilio Lahr-Vivaz] fixing set bit validity value in NullableMapVector load
1679934 [Emilio Lahr-Vivaz] cleaning up license
70639e0 [Emilio Lahr-Vivaz] restoring vector loader test
bde4eee [Wes McKinney] Handle 0-length message indicator for EOS in C++ StreamReader
a24854b [Emilio Lahr-Vivaz] fixing StreamToFile conversion
2ee7cfb [Emilio Lahr-Vivaz] fixing FileToStream conversion
adec200 [Emilio Lahr-Vivaz] making arrow magic static, cleanup
8366288 [Emilio Lahr-Vivaz] making magic array private
127937f [Emilio Lahr-Vivaz] removing qualifier for magic
db9a007 [Emilio Lahr-Vivaz] adding dictionary tests to echo server
95c7b2a [Emilio Lahr-Vivaz] cleanup
45caa02 [Emilio Lahr-Vivaz] reverting basewriter dictionary methods
682db6f [Emilio Lahr-Vivaz] cleanup
a1508b9 [Emilio Lahr-Vivaz] removing dictionary vector method (instead use field.dictionary)
43c28af [Emilio Lahr-Vivaz] adding test for nested dictionary encoded list
92a1e6f [Emilio Lahr-Vivaz] fixing imports
e567564 [Emilio Lahr-Vivaz] adding field size check in vectorschemaroot
568fda5 [Emilio Lahr-Vivaz] imports, formatting
363308e [Emilio Lahr-Vivaz] fixing tests
2f69be1 [Emilio Lahr-Vivaz] not passing around dictionary vectors with dictionary fields, adding dictionary encoding to fields, restoring vector loader/unloader
e5c8e02 [Emilio Lahr-Vivaz] Merging dictionary unloader/loader with arrow writer/reader Creating base class for stream/file writer Creating base class with visitors for arrow messages Indentation fixes Other cleanup
d095f3f [Emilio Lahr-Vivaz] ARROW-542: Adding dictionary encoding to file and stream writing


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/49f666e7
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/49f666e7
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/49f666e7

Branch: refs/heads/master
Commit: 49f666e740208d1e6167537f141f27b6b78b77cb
Parents: 3b65001
Author: Emilio Lahr-Vivaz <el...@ccri.com>
Authored: Thu Mar 16 13:59:53 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Thu Mar 16 13:59:53 2017 -0400

----------------------------------------------------------------------
 cpp/src/arrow/ipc/reader.cc                     |   6 +
 integration/integration_test.py                 |   4 +
 .../java/org/apache/arrow/tools/EchoServer.java |  48 +-
 .../org/apache/arrow/tools/FileRoundtrip.java   |  48 +-
 .../org/apache/arrow/tools/FileToStream.java    |  27 +-
 .../org/apache/arrow/tools/Integration.java     |  83 +--
 .../org/apache/arrow/tools/StreamToFile.java    |  19 +-
 .../arrow/tools/ArrowFileTestFixtures.java      |  51 +-
 .../org/apache/arrow/tools/EchoServerTest.java  | 280 ++++++--
 .../org/apache/arrow/tools/TestIntegration.java |  38 +-
 java/tools/tmptestfilesio                       | Bin 0 -> 628 bytes
 .../src/main/codegen/templates/MapWriters.java  |   8 +-
 .../codegen/templates/NullableValueVectors.java |  40 +-
 .../src/main/codegen/templates/UnionVector.java |  10 +-
 .../java/org/apache/arrow/vector/BitVector.java |   2 +-
 .../org/apache/arrow/vector/FieldVector.java    |   4 +-
 .../org/apache/arrow/vector/VectorLoader.java   |  13 +-
 .../apache/arrow/vector/VectorSchemaRoot.java   |  32 +-
 .../org/apache/arrow/vector/VectorUnloader.java |  27 +-
 .../vector/complex/AbstractContainerVector.java |   3 +-
 .../arrow/vector/complex/AbstractMapVector.java |   9 +-
 .../vector/complex/BaseRepeatedValueVector.java |   5 +-
 .../arrow/vector/complex/DictionaryVector.java  | 229 -------
 .../apache/arrow/vector/complex/ListVector.java |  26 +-
 .../apache/arrow/vector/complex/MapVector.java  |   5 +-
 .../arrow/vector/complex/NullableMapVector.java |   9 +-
 .../vector/complex/impl/ComplexWriterImpl.java  |   6 +-
 .../vector/complex/impl/PromotableWriter.java   |   5 +-
 .../arrow/vector/dictionary/Dictionary.java     |  66 ++
 .../vector/dictionary/DictionaryEncoder.java    | 144 ++++
 .../vector/dictionary/DictionaryProvider.java   |  47 ++
 .../arrow/vector/file/ArrowFileReader.java      | 142 ++++
 .../arrow/vector/file/ArrowFileWriter.java      |  59 ++
 .../apache/arrow/vector/file/ArrowFooter.java   |   1 -
 .../apache/arrow/vector/file/ArrowMagic.java    |  37 ++
 .../apache/arrow/vector/file/ArrowReader.java   | 222 +++++--
 .../apache/arrow/vector/file/ArrowWriter.java   | 173 +++--
 .../apache/arrow/vector/file/ReadChannel.java   |  11 +-
 .../arrow/vector/file/SeekableReadChannel.java  |  39 ++
 .../apache/arrow/vector/file/WriteChannel.java  |   7 +-
 .../arrow/vector/file/json/JsonFileReader.java  |  26 +-
 .../vector/schema/ArrowDictionaryBatch.java     |  60 ++
 .../arrow/vector/schema/ArrowMessage.java       |  30 +
 .../arrow/vector/schema/ArrowRecordBatch.java   |   8 +-
 .../arrow/vector/stream/ArrowStreamReader.java  |  88 +--
 .../arrow/vector/stream/ArrowStreamWriter.java  |  75 +--
 .../arrow/vector/stream/MessageSerializer.java  | 164 ++++-
 .../apache/arrow/vector/types/Dictionary.java   |  40 --
 .../org/apache/arrow/vector/types/Types.java    | 114 ++--
 .../vector/types/pojo/DictionaryEncoding.java   |  51 ++
 .../apache/arrow/vector/types/pojo/Field.java   |  59 +-
 .../apache/arrow/vector/TestDecimalVector.java  |   2 +-
 .../arrow/vector/TestDictionaryVector.java      |  82 +--
 .../org/apache/arrow/vector/TestListVector.java |   4 +-
 .../apache/arrow/vector/TestValueVector.java    |  12 +-
 .../arrow/vector/TestVectorUnloadLoad.java      |  22 +-
 .../complex/impl/TestPromotableWriter.java      |   2 +-
 .../complex/writer/TestComplexWriter.java       |  14 +-
 .../apache/arrow/vector/file/TestArrowFile.java | 665 ++++++++++++-------
 .../vector/file/TestArrowReaderWriter.java      |  28 +-
 .../arrow/vector/file/TestArrowStream.java      | 102 +++
 .../arrow/vector/file/TestArrowStreamPipe.java  | 163 +++++
 .../arrow/vector/file/json/TestJSONFile.java    |   4 +-
 .../vector/stream/MessageSerializerTest.java    |   8 +-
 .../arrow/vector/stream/TestArrowStream.java    |  96 ---
 .../vector/stream/TestArrowStreamPipe.java      | 129 ----
 66 files changed, 2522 insertions(+), 1511 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/cpp/src/arrow/ipc/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 9734166..4cb5f6c 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -78,6 +78,12 @@ class StreamReader::StreamReaderImpl {
 
     int32_t message_length = *reinterpret_cast<const int32_t*>(buffer->data());
 
+    if (message_length == 0) {
+      // Optional 0 EOS control message
+      *message = nullptr;
+      return Status::OK();
+    }
+
     RETURN_NOT_OK(stream_->Read(message_length, &buffer));
     if (buffer->size() != message_length) {
       return Status::IOError("Unexpected end of stream trying to read message");

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/integration/integration_test.py
----------------------------------------------------------------------
diff --git a/integration/integration_test.py b/integration/integration_test.py
index 049436a..5cd63c5 100644
--- a/integration/integration_test.py
+++ b/integration/integration_test.py
@@ -680,12 +680,16 @@ class JavaTester(Tester):
         cmd = ['java', '-cp', self.ARROW_TOOLS_JAR,
                'org.apache.arrow.tools.StreamToFile',
                stream_path, file_path]
+        if self.debug:
+            print(' '.join(cmd))
         run_cmd(cmd)
 
     def file_to_stream(self, file_path, stream_path):
         cmd = ['java', '-cp', self.ARROW_TOOLS_JAR,
                'org.apache.arrow.tools.FileToStream',
                file_path, stream_path]
+        if self.debug:
+            print(' '.join(cmd))
         run_cmd(cmd)
 
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
index c00620e..7c0cadd 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
@@ -18,23 +18,19 @@
 package org.apache.arrow.tools;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.ServerSocket;
 import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
+
+import com.google.common.base.Preconditions;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.stream.ArrowStreamReader;
 import org.apache.arrow.vector.stream.ArrowStreamWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 public class EchoServer {
   private static final Logger LOGGER = LoggerFactory.getLogger(EchoServer.class);
 
@@ -57,30 +53,28 @@ public class EchoServer {
 
     public void run() throws IOException {
       BufferAllocator  allocator = new RootAllocator(Long.MAX_VALUE);
-      List<ArrowRecordBatch> batches = new ArrayList<ArrowRecordBatch>();
-      try (
-        InputStream in = socket.getInputStream();
-        OutputStream out = socket.getOutputStream();
-        ArrowStreamReader reader = new ArrowStreamReader(in, allocator);
-      ) {
-        // Read the entire input stream.
-        reader.init();
-        while (true) {
-          ArrowRecordBatch batch = reader.nextRecordBatch();
-          if (batch == null) break;
-          batches.add(batch);
-        }
-        LOGGER.info(String.format("Received %d batches", batches.size()));
-
-        // Write it back
-        try (ArrowStreamWriter writer = new ArrowStreamWriter(out, reader.getSchema())) {
-          for (ArrowRecordBatch batch: batches) {
-            writer.writeRecordBatch(batch);
+      // Read the entire input stream and write it back
+      try (ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) {
+        VectorSchemaRoot root = reader.getVectorSchemaRoot();
+        // load the first batch before instantiating the writer so that we have any dictionaries
+        reader.loadNextBatch();
+        try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, socket.getOutputStream())) {
+          writer.start();
+          int echoed = 0;
+          while (true) {
+            int rowCount = reader.getVectorSchemaRoot().getRowCount();
+            if (rowCount == 0) {
+              break;
+            } else {
+              writer.writeBatch();
+              echoed += rowCount;
+              reader.loadNextBatch();
+            }
           }
           writer.end();
           Preconditions.checkState(reader.bytesRead() == writer.bytesWritten());
+          LOGGER.info(String.format("Echoed %d records", echoed));
         }
-        LOGGER.info("Done writing stream back.");
       }
     }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
index db7a1c2..9fa7b76 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
@@ -23,18 +23,12 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
-import java.util.List;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.VectorLoader;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.VectorUnloader;
-import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ArrowFooter;
-import org.apache.arrow.vector.file.ArrowReader;
-import org.apache.arrow.vector.file.ArrowWriter;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.file.ArrowFileReader;
+import org.apache.arrow.vector.file.ArrowFileWriter;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -86,35 +80,27 @@ public class FileRoundtrip {
       File inFile = validateFile("input", inFileName);
       File outFile = validateFile("output", outFileName);
       BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); // TODO: close
-      try(
-          FileInputStream fileInputStream = new FileInputStream(inFile);
-          ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);) {
+      try (FileInputStream fileInputStream = new FileInputStream(inFile);
+           ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) {
 
-        ArrowFooter footer = arrowReader.readFooter();
-        Schema schema = footer.getSchema();
+        VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+        Schema schema = root.getSchema();
         LOGGER.debug("Input file size: " + inFile.length());
         LOGGER.debug("Found schema: " + schema);
 
-        try (
-            FileOutputStream fileOutputStream = new FileOutputStream(outFile);
-            ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
-            ) {
-
-          // initialize vectors
-
-          List<ArrowBlock> recordBatches = footer.getRecordBatches();
-          for (ArrowBlock rbBlock : recordBatches) {
-            try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
-                VectorSchemaRoot root = new VectorSchemaRoot(schema, allocator);) {
-
-              VectorLoader vectorLoader = new VectorLoader(root);
-              vectorLoader.load(inRecordBatch);
-
-              VectorUnloader vectorUnloader = new VectorUnloader(root);
-              ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
-              arrowWriter.writeRecordBatch(recordBatch);
+        try (FileOutputStream fileOutputStream = new FileOutputStream(outFile);
+             ArrowFileWriter arrowWriter = new ArrowFileWriter(root, arrowReader, fileOutputStream.getChannel())) {
+          arrowWriter.start();
+          while (true) {
+            arrowReader.loadNextBatch();
+            int loaded = root.getRowCount();
+            if (loaded == 0) {
+              break;
+            } else {
+              arrowWriter.writeBatch();
             }
           }
+          arrowWriter.end();
         }
         LOGGER.debug("Output file size: " + outFile.length());
       }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
index ba6505c..d534553 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
@@ -25,10 +25,8 @@ import java.io.OutputStream;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ArrowFooter;
-import org.apache.arrow.vector.file.ArrowReader;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.file.ArrowFileReader;
 import org.apache.arrow.vector.stream.ArrowStreamWriter;
 
 /**
@@ -36,19 +34,20 @@ import org.apache.arrow.vector.stream.ArrowStreamWriter;
  * first argument and the output is written to standard out.
  */
 public class FileToStream {
+
   public static void convert(FileInputStream in, OutputStream out) throws IOException {
     BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
-    try(
-        ArrowReader reader = new ArrowReader(in.getChannel(), allocator);) {
-      ArrowFooter footer = reader.readFooter();
-      try (
-        ArrowStreamWriter writer = new ArrowStreamWriter(out, footer.getSchema());
-      ) {
-        for (ArrowBlock block: footer.getRecordBatches()) {
-          try (ArrowRecordBatch batch = reader.readRecordBatch(block)) {
-            writer.writeRecordBatch(batch);
-          }
+    try (ArrowFileReader reader = new ArrowFileReader(in.getChannel(), allocator)) {
+      VectorSchemaRoot root = reader.getVectorSchemaRoot();
+      // load the first batch before instantiating the writer so that we have any dictionaries
+      reader.loadNextBatch();
+      try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, out)) {
+        writer.start();
+        while (root.getRowCount() > 0) {
+          writer.writeBatch();
+          reader.loadNextBatch();
         }
+        writer.end();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
index 36d4ee5..5d4849c 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
@@ -28,16 +28,12 @@ import java.util.List;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.VectorLoader;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.VectorUnloader;
 import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ArrowFooter;
-import org.apache.arrow.vector.file.ArrowReader;
-import org.apache.arrow.vector.file.ArrowWriter;
+import org.apache.arrow.vector.file.ArrowFileReader;
+import org.apache.arrow.vector.file.ArrowFileWriter;
 import org.apache.arrow.vector.file.json.JsonFileReader;
 import org.apache.arrow.vector.file.json.JsonFileWriter;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.arrow.vector.util.Validator;
 import org.apache.commons.cli.CommandLine;
@@ -69,24 +65,18 @@ public class Integration {
     ARROW_TO_JSON(true, false) {
       @Override
       public void execute(File arrowFile, File jsonFile) throws IOException {
-        try(
-            BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+        try(BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
             FileInputStream fileInputStream = new FileInputStream(arrowFile);
-            ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);) {
-          ArrowFooter footer = arrowReader.readFooter();
-          Schema schema = footer.getSchema();
+            ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) {
+          VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+          Schema schema = root.getSchema();
           LOGGER.debug("Input file size: " + arrowFile.length());
           LOGGER.debug("Found schema: " + schema);
-          try (JsonFileWriter writer = new JsonFileWriter(jsonFile, JsonFileWriter.config().pretty(true));) {
+          try (JsonFileWriter writer = new JsonFileWriter(jsonFile, JsonFileWriter.config().pretty(true))) {
             writer.start(schema);
-            List<ArrowBlock> recordBatches = footer.getRecordBatches();
-            for (ArrowBlock rbBlock : recordBatches) {
-              try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
-                  VectorSchemaRoot root = new VectorSchemaRoot(schema, allocator);) {
-                VectorLoader vectorLoader = new VectorLoader(root);
-                vectorLoader.load(inRecordBatch);
-                writer.write(root);
-              }
+            for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) {
+              arrowReader.loadRecordBatch(rbBlock);
+              writer.write(root);
             }
           }
           LOGGER.debug("Output file size: " + jsonFile.length());
@@ -96,27 +86,22 @@ public class Integration {
     JSON_TO_ARROW(false, true) {
       @Override
       public void execute(File arrowFile, File jsonFile) throws IOException {
-        try (
-            BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
-            JsonFileReader reader = new JsonFileReader(jsonFile, allocator);
-            ) {
+        try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+             JsonFileReader reader = new JsonFileReader(jsonFile, allocator)) {
           Schema schema = reader.start();
           LOGGER.debug("Input file size: " + jsonFile.length());
           LOGGER.debug("Found schema: " + schema);
-          try (
-              FileOutputStream fileOutputStream = new FileOutputStream(arrowFile);
-              ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
-              ) {
-
-            // initialize vectors
-            VectorSchemaRoot root;
-            while ((root = reader.read()) != null) {
-              VectorUnloader vectorUnloader = new VectorUnloader(root);
-              try (ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();) {
-                arrowWriter.writeRecordBatch(recordBatch);
-              }
-              root.close();
+          try (FileOutputStream fileOutputStream = new FileOutputStream(arrowFile);
+               VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
+               // TODO json dictionaries
+               ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel())) {
+            arrowWriter.start();
+            reader.read(root);
+            while (root.getRowCount() != 0) {
+              arrowWriter.writeBatch();
+              reader.read(root);
             }
+            arrowWriter.end();
           }
           LOGGER.debug("Output file size: " + arrowFile.length());
         }
@@ -125,32 +110,26 @@ public class Integration {
     VALIDATE(true, true) {
       @Override
       public void execute(File arrowFile, File jsonFile) throws IOException {
-        try (
-            BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
-            JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator);
-            FileInputStream fileInputStream = new FileInputStream(arrowFile);
-            ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);
-            ) {
+        try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+             JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator);
+             FileInputStream fileInputStream = new FileInputStream(arrowFile);
+             ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) {
           Schema jsonSchema = jsonReader.start();
-          ArrowFooter footer = arrowReader.readFooter();
-          Schema arrowSchema = footer.getSchema();
+          VectorSchemaRoot arrowRoot = arrowReader.getVectorSchemaRoot();
+          Schema arrowSchema = arrowRoot.getSchema();
           LOGGER.debug("Arrow Input file size: " + arrowFile.length());
           LOGGER.debug("ARROW schema: " + arrowSchema);
           LOGGER.debug("JSON Input file size: " + jsonFile.length());
           LOGGER.debug("JSON schema: " + jsonSchema);
           Validator.compareSchemas(jsonSchema, arrowSchema);
 
-          List<ArrowBlock> recordBatches = footer.getRecordBatches();
+          List<ArrowBlock> recordBatches = arrowReader.getRecordBlocks();
           Iterator<ArrowBlock> iterator = recordBatches.iterator();
           VectorSchemaRoot jsonRoot;
           while ((jsonRoot = jsonReader.read()) != null && iterator.hasNext()) {
             ArrowBlock rbBlock = iterator.next();
-            try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
-                VectorSchemaRoot arrowRoot = new VectorSchemaRoot(arrowSchema, allocator);) {
-              VectorLoader vectorLoader = new VectorLoader(arrowRoot);
-              vectorLoader.load(inRecordBatch);
-              Validator.compareVectorSchemaRoot(arrowRoot, jsonRoot);
-            }
+            arrowReader.loadRecordBatch(rbBlock);
+            Validator.compareVectorSchemaRoot(arrowRoot, jsonRoot);
             jsonRoot.close();
           }
           boolean hasMoreJSON = jsonRoot != null;

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
index c8a5c89..3b79d5b 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
@@ -27,8 +27,8 @@ import java.nio.channels.Channels;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.file.ArrowWriter;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.file.ArrowFileWriter;
 import org.apache.arrow.vector.stream.ArrowStreamReader;
 
 /**
@@ -38,13 +38,16 @@ public class StreamToFile {
   public static void convert(InputStream in, OutputStream out) throws IOException {
     BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
     try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) {
-      reader.init();
-      try (ArrowWriter writer = new ArrowWriter(Channels.newChannel(out), reader.getSchema());) {
-        while (true) {
-          ArrowRecordBatch batch = reader.nextRecordBatch();
-          if (batch == null) break;
-          writer.writeRecordBatch(batch);
+      VectorSchemaRoot root = reader.getVectorSchemaRoot();
+      // load the first batch before instantiating the writer so that we have any dictionaries
+      reader.loadNextBatch();
+      try (ArrowFileWriter writer = new ArrowFileWriter(root, reader, Channels.newChannel(out))) {
+        writer.start();
+        while (root.getRowCount() > 0) {
+          writer.writeBatch();
+          reader.loadNextBatch();
         }
+        writer.end();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
----------------------------------------------------------------------
diff --git a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
index 4cfc52f..f752f7e 100644
--- a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
+++ b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
@@ -23,13 +23,10 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.util.List;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.VectorLoader;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.VectorUnloader;
 import org.apache.arrow.vector.complex.MapVector;
 import org.apache.arrow.vector.complex.impl.ComplexWriterImpl;
 import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
@@ -37,10 +34,8 @@ import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
 import org.apache.arrow.vector.complex.writer.BigIntWriter;
 import org.apache.arrow.vector.complex.writer.IntWriter;
 import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ArrowFooter;
-import org.apache.arrow.vector.file.ArrowReader;
-import org.apache.arrow.vector.file.ArrowWriter;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.file.ArrowFileReader;
+import org.apache.arrow.vector.file.ArrowFileWriter;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.junit.Assert;
 
@@ -63,26 +58,14 @@ public class ArrowFileTestFixtures {
 
   static void validateOutput(File testOutFile, BufferAllocator allocator) throws Exception {
     // read
-    try (
-        BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
-        FileInputStream fileInputStream = new FileInputStream(testOutFile);
-        ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator);
-        BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-        ) {
-      ArrowFooter footer = arrowReader.readFooter();
-      Schema schema = footer.getSchema();
-
-      // initialize vectors
-      try (VectorSchemaRoot root = new VectorSchemaRoot(schema, readerAllocator)) {
-        VectorLoader vectorLoader = new VectorLoader(root);
-
-        List<ArrowBlock> recordBatches = footer.getRecordBatches();
-        for (ArrowBlock rbBlock : recordBatches) {
-          try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
-            vectorLoader.load(recordBatch);
-          }
-          validateContent(COUNT, root);
-        }
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         FileInputStream fileInputStream = new FileInputStream(testOutFile);
+         ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
+      for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) {
+        arrowReader.loadRecordBatch(rbBlock);
+        validateContent(COUNT, root);
       }
     }
   }
@@ -96,16 +79,10 @@ public class ArrowFileTestFixtures {
   }
 
   static void write(FieldVector parent, File file) throws FileNotFoundException, IOException {
-    Schema schema = new Schema(parent.getField().getChildren());
-    int valueCount = parent.getAccessor().getValueCount();
-    List<FieldVector> fields = parent.getChildrenFromFields();
-    VectorUnloader vectorUnloader = new VectorUnloader(schema, valueCount, fields);
-    try (
-        FileOutputStream fileOutputStream = new FileOutputStream(file);
-        ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
-        ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
-            ) {
-      arrowWriter.writeRecordBatch(recordBatch);
+    VectorSchemaRoot root = new VectorSchemaRoot(parent);
+    try (FileOutputStream fileOutputStream = new FileOutputStream(file);
+         ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel())) {
+      arrowWriter.writeBatch();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
----------------------------------------------------------------------
diff --git a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
index 48d6162..706f8e2 100644
--- a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
+++ b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
@@ -24,106 +24,268 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
+
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.NullableIntVector;
+import org.apache.arrow.vector.NullableTinyIntVector;
+import org.apache.arrow.vector.NullableVarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.impl.UnionListWriter;
+import org.apache.arrow.vector.dictionary.Dictionary;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider;
 import org.apache.arrow.vector.stream.ArrowStreamReader;
 import org.apache.arrow.vector.stream.ArrowStreamWriter;
+import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.ArrowType.Int;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.Text;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
-import io.netty.buffer.ArrowBuf;
-
 public class EchoServerTest {
-  public static ArrowBuf buf(BufferAllocator alloc, byte[] bytes) {
-    ArrowBuf buffer = alloc.buffer(bytes.length);
-    buffer.writeBytes(bytes);
-    return buffer;
+
+  private static EchoServer server;
+  private static int serverPort;
+  private static Thread serverThread;
+
+  @BeforeClass
+  public static void startEchoServer() throws IOException {
+    server = new EchoServer(0);
+    serverPort = server.port();
+    serverThread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          server.run();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    };
+    serverThread.start();
   }
 
-  public static byte[] array(ArrowBuf buf) {
-    byte[] bytes = new byte[buf.readableBytes()];
-    buf.readBytes(bytes);
-    return bytes;
+  @AfterClass
+  public static void stopEchoServer() throws IOException, InterruptedException {
+    server.close();
+    serverThread.join();
   }
 
-  private void testEchoServer(int serverPort, Schema schema, List<ArrowRecordBatch> batches)
+  private void testEchoServer(int serverPort,
+                              Field field,
+                              NullableTinyIntVector vector,
+                              int batches)
       throws UnknownHostException, IOException {
     BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
+    VectorSchemaRoot root = new VectorSchemaRoot(asList(field), asList((FieldVector) vector), 0);
     try (Socket socket = new Socket("localhost", serverPort);
-        ArrowStreamWriter writer = new ArrowStreamWriter(socket.getOutputStream(), schema);
+        ArrowStreamWriter writer = new ArrowStreamWriter(root, null, socket.getOutputStream());
         ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), alloc)) {
-      for (ArrowRecordBatch batch: batches) {
-        writer.writeRecordBatch(batch);
+      writer.start();
+      for (int i = 0; i < batches; i++) {
+        vector.allocateNew(16);
+        for (int j = 0; j < 8; j++) {
+          vector.getMutator().set(j, j + i);
+          vector.getMutator().set(j + 8, 0, (byte) (j + i));
+        }
+        vector.getMutator().setValueCount(16);
+        root.setRowCount(16);
+        writer.writeBatch();
       }
       writer.end();
 
-      reader.init();
-      assertEquals(schema, reader.getSchema());
-      for (int i = 0; i < batches.size(); i++) {
-        ArrowRecordBatch result = reader.nextRecordBatch();
-        ArrowRecordBatch expected = batches.get(i);
-        assertTrue(result != null);
-        assertEquals(expected.getBuffers().size(), result.getBuffers().size());
-        for (int j = 0; j < expected.getBuffers().size(); j++) {
-          assertTrue(expected.getBuffers().get(j).compareTo(result.getBuffers().get(j)) == 0);
+      assertEquals(new Schema(asList(field)), reader.getVectorSchemaRoot().getSchema());
+
+      NullableTinyIntVector readVector = (NullableTinyIntVector) reader.getVectorSchemaRoot().getFieldVectors().get(0);
+      for (int i = 0; i < batches; i++) {
+        reader.loadNextBatch();
+        assertEquals(16, reader.getVectorSchemaRoot().getRowCount());
+        assertEquals(16, readVector.getAccessor().getValueCount());
+        for (int j = 0; j < 8; j++) {
+          assertEquals(j + i, readVector.getAccessor().get(j));
+          assertTrue(readVector.getAccessor().isNull(j + 8));
         }
       }
-      ArrowRecordBatch result = reader.nextRecordBatch();
-      assertTrue(result == null);
+      reader.loadNextBatch();
+      assertEquals(0, reader.getVectorSchemaRoot().getRowCount());
       assertEquals(reader.bytesRead(), writer.bytesWritten());
     }
   }
 
   @Test
   public void basicTest() throws InterruptedException, IOException {
-    final EchoServer server = new EchoServer(0);
-    int serverPort = server.port();
-    Thread serverThread = new Thread() {
-      @Override
-      public void run() {
-        try {
-          server.run();
-        } catch (IOException e) {
-          e.printStackTrace();
-        }
-      }
-    };
-    serverThread.start();
-
     BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
-    byte[] validity = new byte[] { (byte)255, 0};
-    byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
-    ArrowBuf validityb = buf(alloc, validity);
-    ArrowBuf valuesb =  buf(alloc, values);
-    ArrowRecordBatch batch = new ArrowRecordBatch(
-        16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb));
 
-    Schema schema = new Schema(asList(new Field(
-        "testField", true, new ArrowType.Int(8, true), Collections.<Field>emptyList())));
+    Field field = new Field("testField", true, new ArrowType.Int(8, true), Collections.<Field>emptyList());
+    NullableTinyIntVector vector = new NullableTinyIntVector("testField", alloc, null);
+    Schema schema = new Schema(asList(field));
 
     // Try an empty stream, just the header.
-    testEchoServer(serverPort, schema, new ArrayList<ArrowRecordBatch>());
+    testEchoServer(serverPort, field, vector, 0);
 
     // Try with one batch.
-    List<ArrowRecordBatch> batches = new ArrayList<>();
-    batches.add(batch);
-    testEchoServer(serverPort, schema, batches);
+    testEchoServer(serverPort, field, vector, 1);
 
     // Try with a few
-    for (int i = 0; i < 10; i++) {
-      batches.add(batch);
+    testEchoServer(serverPort, field, vector, 10);
+  }
+
+  @Test
+  public void testFlatDictionary() throws IOException {
+    DictionaryEncoding writeEncoding = new DictionaryEncoding(1L, false, null);
+    try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+         NullableIntVector writeVector = new NullableIntVector("varchar", allocator, writeEncoding);
+         NullableVarCharVector writeDictionaryVector = new NullableVarCharVector("dict", allocator, null)) {
+      writeVector.allocateNewSafe();
+      NullableIntVector.Mutator mutator = writeVector.getMutator();
+      mutator.set(0, 0);
+      mutator.set(1, 1);
+      mutator.set(3, 2);
+      mutator.set(4, 1);
+      mutator.set(5, 2);
+      mutator.setValueCount(6);
+
+      writeDictionaryVector.allocateNewSafe();
+      NullableVarCharVector.Mutator dictionaryMutator = writeDictionaryVector.getMutator();
+      dictionaryMutator.set(0, "foo".getBytes(StandardCharsets.UTF_8));
+      dictionaryMutator.set(1, "bar".getBytes(StandardCharsets.UTF_8));
+      dictionaryMutator.set(2, "baz".getBytes(StandardCharsets.UTF_8));
+      dictionaryMutator.setValueCount(3);
+
+      List<Field> fields = ImmutableList.of(writeVector.getField());
+      List<FieldVector> vectors = ImmutableList.of((FieldVector) writeVector);
+      VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 6);
+
+      DictionaryProvider writeProvider = new MapDictionaryProvider(new Dictionary(writeDictionaryVector, writeEncoding));
+
+      try (Socket socket = new Socket("localhost", serverPort);
+           ArrowStreamWriter writer = new ArrowStreamWriter(root, writeProvider, socket.getOutputStream());
+           ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) {
+        writer.start();
+        writer.writeBatch();
+        writer.end();
+
+        reader.loadNextBatch();
+        VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+        Assert.assertEquals(6, readerRoot.getRowCount());
+
+        FieldVector readVector = readerRoot.getFieldVectors().get(0);
+        Assert.assertNotNull(readVector);
+
+        DictionaryEncoding readEncoding = readVector.getField().getDictionary();
+        Assert.assertNotNull(readEncoding);
+        Assert.assertEquals(1L, readEncoding.getId());
+
+        FieldVector.Accessor accessor = readVector.getAccessor();
+        Assert.assertEquals(6, accessor.getValueCount());
+        Assert.assertEquals(0, accessor.getObject(0));
+        Assert.assertEquals(1, accessor.getObject(1));
+        Assert.assertEquals(null, accessor.getObject(2));
+        Assert.assertEquals(2, accessor.getObject(3));
+        Assert.assertEquals(1, accessor.getObject(4));
+        Assert.assertEquals(2, accessor.getObject(5));
+
+        Dictionary dictionary = reader.lookup(1L);
+        Assert.assertNotNull(dictionary);
+        NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) dictionary.getVector()).getAccessor();
+        Assert.assertEquals(3, dictionaryAccessor.getValueCount());
+        Assert.assertEquals(new Text("foo"), dictionaryAccessor.getObject(0));
+        Assert.assertEquals(new Text("bar"), dictionaryAccessor.getObject(1));
+        Assert.assertEquals(new Text("baz"), dictionaryAccessor.getObject(2));
+      }
     }
-    testEchoServer(serverPort, schema, batches);
+  }
 
-    server.close();
-    serverThread.join();
+  @Test
+  public void testNestedDictionary() throws IOException {
+    DictionaryEncoding writeEncoding = new DictionaryEncoding(2L, false, null);
+    try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+         NullableVarCharVector writeDictionaryVector = new NullableVarCharVector("dictionary", allocator, null);
+         ListVector writeVector = new ListVector("list", allocator, null, null)) {
+
+      // data being written:
+      // [['foo', 'bar'], ['foo'], ['bar']] -> [[0, 1], [0], [1]]
+
+      writeDictionaryVector.allocateNew();
+      writeDictionaryVector.getMutator().set(0, "foo".getBytes(StandardCharsets.UTF_8));
+      writeDictionaryVector.getMutator().set(1, "bar".getBytes(StandardCharsets.UTF_8));
+      writeDictionaryVector.getMutator().setValueCount(2);
+
+      writeVector.addOrGetVector(MinorType.INT, writeEncoding);
+      writeVector.allocateNew();
+      UnionListWriter listWriter = new UnionListWriter(writeVector);
+      listWriter.startList();
+      listWriter.writeInt(0);
+      listWriter.writeInt(1);
+      listWriter.endList();
+      listWriter.startList();
+      listWriter.writeInt(0);
+      listWriter.endList();
+      listWriter.startList();
+      listWriter.writeInt(1);
+      listWriter.endList();
+      listWriter.setValueCount(3);
+
+      List<Field> fields = ImmutableList.of(writeVector.getField());
+      List<FieldVector> vectors = ImmutableList.of((FieldVector) writeVector);
+      VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 3);
+
+      DictionaryProvider writeProvider = new MapDictionaryProvider(new Dictionary(writeDictionaryVector, writeEncoding));
+
+      try (Socket socket = new Socket("localhost", serverPort);
+           ArrowStreamWriter writer = new ArrowStreamWriter(root, writeProvider, socket.getOutputStream());
+           ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) {
+        writer.start();
+        writer.writeBatch();
+        writer.end();
+
+        reader.loadNextBatch();
+        VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+        Assert.assertEquals(3, readerRoot.getRowCount());
+
+        ListVector readVector = (ListVector) readerRoot.getFieldVectors().get(0);
+        Assert.assertNotNull(readVector);
+
+        Assert.assertNull(readVector.getField().getDictionary());
+        DictionaryEncoding readEncoding = readVector.getField().getChildren().get(0).getDictionary();
+        Assert.assertNotNull(readEncoding);
+        Assert.assertEquals(2L, readEncoding.getId());
+
+        Field nestedField = readVector.getField().getChildren().get(0);
+
+        DictionaryEncoding encoding = nestedField.getDictionary();
+        Assert.assertNotNull(encoding);
+        Assert.assertEquals(2L, encoding.getId());
+        Assert.assertEquals(new Int(32, true), encoding.getIndexType());
+
+        ListVector.Accessor accessor = readVector.getAccessor();
+        Assert.assertEquals(3, accessor.getValueCount());
+        Assert.assertEquals(Arrays.asList(0, 1), accessor.getObject(0));
+        Assert.assertEquals(Arrays.asList(0), accessor.getObject(1));
+        Assert.assertEquals(Arrays.asList(1), accessor.getObject(2));
+
+        Dictionary readDictionary = reader.lookup(2L);
+        Assert.assertNotNull(readDictionary);
+        NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) readDictionary.getVector()).getAccessor();
+        Assert.assertEquals(2, dictionaryAccessor.getValueCount());
+        Assert.assertEquals(new Text("foo"), dictionaryAccessor.getObject(0));
+        Assert.assertEquals(new Text("bar"), dictionaryAccessor.getObject(1));
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java
----------------------------------------------------------------------
diff --git a/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java b/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java
index 0ae32be..9d4ef5c 100644
--- a/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java
+++ b/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java
@@ -33,6 +33,11 @@ import java.io.IOException;
 import java.io.StringReader;
 import java.util.Map;
 
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter.NopIndenter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.tools.Integration.Command;
@@ -49,11 +54,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
-import com.fasterxml.jackson.core.util.DefaultPrettyPrinter.NopIndenter;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-
 public class TestIntegration {
 
   @Rule
@@ -128,6 +128,34 @@ public class TestIntegration {
     }
   }
 
+  @Test
+  public void testJSONRoundTripWithStruct() throws Exception {
+    File testJSONFile = new File("../../integration/data/struct_example.json");
+    File testOutFile = testFolder.newFile("testOutStruct.arrow");
+    File testRoundTripJSONFile = testFolder.newFile("testOutStruct.json");
+    testOutFile.delete();
+    testRoundTripJSONFile.delete();
+
+    Integration integration = new Integration();
+
+    // convert to arrow
+    String[] args1 = { "-arrow", testOutFile.getAbsolutePath(), "-json",  testJSONFile.getAbsolutePath(), "-command", Command.JSON_TO_ARROW.name()};
+    integration.run(args1);
+
+    // convert back to json
+    String[] args2 = { "-arrow", testOutFile.getAbsolutePath(), "-json",  testRoundTripJSONFile.getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()};
+    integration.run(args2);
+
+    BufferedReader orig = readNormalized(testJSONFile);
+    BufferedReader rt = readNormalized(testRoundTripJSONFile);
+    String i, o;
+    int j = 0;
+    while ((i = orig.readLine()) != null && (o = rt.readLine()) != null) {
+      assertEquals("line: " + j, i, o);
+      ++j;
+    }
+  }
+
   private ObjectMapper om = new ObjectMapper();
   {
     DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter();

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/tmptestfilesio
----------------------------------------------------------------------
diff --git a/java/tools/tmptestfilesio b/java/tools/tmptestfilesio
new file mode 100644
index 0000000..d1b6b6c
Binary files /dev/null and b/java/tools/tmptestfilesio differ

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/codegen/templates/MapWriters.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/templates/MapWriters.java b/java/vector/src/main/codegen/templates/MapWriters.java
index 4af6eee..428ce04 100644
--- a/java/vector/src/main/codegen/templates/MapWriters.java
+++ b/java/vector/src/main/codegen/templates/MapWriters.java
@@ -64,7 +64,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter {
         list(child.getName());
         break;
       case UNION:
-        UnionWriter writer = new UnionWriter(container.addOrGet(child.getName(), MinorType.UNION, UnionVector.class), getNullableMapWriterFactory());
+        UnionWriter writer = new UnionWriter(container.addOrGet(child.getName(), MinorType.UNION, UnionVector.class, null), getNullableMapWriterFactory());
         fields.put(handleCase(child.getName()), writer);
         break;
 <#list vv.types as type><#list type.minor as minor>
@@ -113,7 +113,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter {
     FieldWriter writer = fields.get(finalName);
     if(writer == null){
       int vectorCount=container.size();
-      NullableMapVector vector = container.addOrGet(name, MinorType.MAP, NullableMapVector.class);
+      NullableMapVector vector = container.addOrGet(name, MinorType.MAP, NullableMapVector.class, null);
       writer = new PromotableWriter(vector, container, getNullableMapWriterFactory());
       if(vectorCount != container.size()) {
         writer.allocate();
@@ -157,7 +157,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter {
     FieldWriter writer = fields.get(finalName);
     int vectorCount = container.size();
     if(writer == null) {
-      writer = new PromotableWriter(container.addOrGet(name, MinorType.LIST, ListVector.class), container, getNullableMapWriterFactory());
+      writer = new PromotableWriter(container.addOrGet(name, MinorType.LIST, ListVector.class, null), container, getNullableMapWriterFactory());
       if (container.size() > vectorCount) {
         writer.allocate();
       }
@@ -222,7 +222,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter {
     if(writer == null) {
       ValueVector vector;
       ValueVector currentVector = container.getChild(name);
-      ${vectName}Vector v = container.addOrGet(name, MinorType.${upperName}, ${vectName}Vector.class<#if minor.class == "Decimal"> , new int[] {precision, scale}</#if>);
+      ${vectName}Vector v = container.addOrGet(name, MinorType.${upperName}, ${vectName}Vector.class, null<#if minor.class == "Decimal"> , new int[] {precision, scale}</#if>);
       writer = new PromotableWriter(v, container, getNullableMapWriterFactory());
       vector = v;
       if (currentVector == null || currentVector != vector) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/templates/NullableValueVectors.java b/java/vector/src/main/codegen/templates/NullableValueVectors.java
index 6b25fb3..b3e10e3 100644
--- a/java/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/java/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -65,21 +65,21 @@ public final class ${className} extends BaseDataValueVector implements <#if type
   private final int precision;
   private final int scale;
 
-  public ${className}(String name, BufferAllocator allocator, int precision, int scale) {
+  public ${className}(String name, BufferAllocator allocator, DictionaryEncoding dictionary, int precision, int scale) {
     super(name, allocator);
     values = new ${valuesName}(valuesField, allocator, precision, scale);
     this.precision = precision;
     this.scale = scale;
     mutator = new Mutator();
     accessor = new Accessor();
-    field = new Field(name, true, new Decimal(precision, scale), null);
+    field = new Field(name, true, new Decimal(precision, scale), dictionary, null);
     innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList(
         bits,
         values
     ));
   }
   <#else>
-  public ${className}(String name, BufferAllocator allocator) {
+  public ${className}(String name, BufferAllocator allocator, DictionaryEncoding dictionary) {
     super(name, allocator);
     values = new ${valuesName}(valuesField, allocator);
     mutator = new Mutator();
@@ -88,38 +88,38 @@ public final class ${className} extends BaseDataValueVector implements <#if type
         minor.class == "SmallInt" ||
         minor.class == "Int" ||
         minor.class == "BigInt">
-    field = new Field(name, true, new Int(${type.width} * 8, true), null);
+    field = new Field(name, true, new Int(${type.width} * 8, true), dictionary, null);
   <#elseif minor.class == "UInt1" ||
         minor.class == "UInt2" ||
         minor.class == "UInt4" ||
         minor.class == "UInt8">
-    field = new Field(name, true, new Int(${type.width} * 8, false), null);
+    field = new Field(name, true, new Int(${type.width} * 8, false), dictionary, null);
   <#elseif minor.class == "Date">
-    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Date(), null);
+    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Date(), dictionary, null);
   <#elseif minor.class == "Time">
-    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Time(), null);
+    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Time(), dictionary, null);
   <#elseif minor.class == "Float4">
-    field = new Field(name, true, new FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE), null);
+    field = new Field(name, true, new FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE), dictionary, null);
   <#elseif minor.class == "Float8">
-    field = new Field(name, true, new FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE), null);
+    field = new Field(name, true, new FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE), dictionary, null);
   <#elseif minor.class == "TimeStampSec">
-    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.SECOND), null);
+    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.SECOND), dictionary, null);
   <#elseif minor.class == "TimeStampMilli">
-    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.MILLISECOND), null);
+    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.MILLISECOND), dictionary, null);
   <#elseif minor.class == "TimeStampMicro">
-    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.MICROSECOND), null);
+    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.MICROSECOND), dictionary, null);
   <#elseif minor.class == "TimeStampNano">
-    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.NANOSECOND), null);
+    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.NANOSECOND), dictionary, null);
   <#elseif minor.class == "IntervalDay">
-    field = new Field(name, true, new Interval(org.apache.arrow.vector.types.IntervalUnit.DAY_TIME), null);
+    field = new Field(name, true, new Interval(org.apache.arrow.vector.types.IntervalUnit.DAY_TIME), dictionary, null);
   <#elseif minor.class == "IntervalYear">
-    field = new Field(name, true, new Interval(org.apache.arrow.vector.types.IntervalUnit.YEAR_MONTH), null);
+    field = new Field(name, true, new Interval(org.apache.arrow.vector.types.IntervalUnit.YEAR_MONTH), dictionary, null);
   <#elseif minor.class == "VarChar">
-    field = new Field(name, true, new Utf8(), null);
+    field = new Field(name, true, new Utf8(), dictionary, null);
   <#elseif minor.class == "VarBinary">
-    field = new Field(name, true, new Binary(), null);
+    field = new Field(name, true, new Binary(), dictionary, null);
   <#elseif minor.class == "Bit">
-    field = new Field(name, true, new Bool(), null);
+    field = new Field(name, true, new Bool(), dictionary, null);
   </#if>
     innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList(
         bits,
@@ -378,9 +378,9 @@ public final class ${className} extends BaseDataValueVector implements <#if type
 
     public TransferImpl(String name, BufferAllocator allocator){
       <#if minor.class == "Decimal">
-      to = new ${className}(name, allocator, precision, scale);
+      to = new ${className}(name, allocator, field.getDictionary(), precision, scale);
       <#else>
-      to = new ${className}(name, allocator);
+      to = new ${className}(name, allocator, field.getDictionary());
       </#if>
     }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/codegen/templates/UnionVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java
index 1a6908d..076ed93 100644
--- a/java/vector/src/main/codegen/templates/UnionVector.java
+++ b/java/vector/src/main/codegen/templates/UnionVector.java
@@ -118,11 +118,11 @@ public class UnionVector implements FieldVector {
   public List<BufferBacked> getFieldInnerVectors() {
      return this.innerVectors;
   }
-  
+
   public NullableMapVector getMap() {
     if (mapVector == null) {
       int vectorCount = internalMap.size();
-      mapVector = internalMap.addOrGet("map", MinorType.MAP, NullableMapVector.class);
+      mapVector = internalMap.addOrGet("map", MinorType.MAP, NullableMapVector.class, null);
       if (internalMap.size() > vectorCount) {
         mapVector.allocateNew();
         if (callBack != null) {
@@ -144,7 +144,7 @@ public class UnionVector implements FieldVector {
   public Nullable${name}Vector get${name}Vector() {
     if (${uncappedName}Vector == null) {
       int vectorCount = internalMap.size();
-      ${uncappedName}Vector = internalMap.addOrGet("${lowerCaseName}", MinorType.${name?upper_case}, Nullable${name}Vector.class);
+      ${uncappedName}Vector = internalMap.addOrGet("${lowerCaseName}", MinorType.${name?upper_case}, Nullable${name}Vector.class, null);
       if (internalMap.size() > vectorCount) {
         ${uncappedName}Vector.allocateNew();
         if (callBack != null) {
@@ -162,7 +162,7 @@ public class UnionVector implements FieldVector {
   public ListVector getList() {
     if (listVector == null) {
       int vectorCount = internalMap.size();
-      listVector = internalMap.addOrGet("list", MinorType.LIST, ListVector.class);
+      listVector = internalMap.addOrGet("list", MinorType.LIST, ListVector.class, null);
       if (internalMap.size() > vectorCount) {
         listVector.allocateNew();
         if (callBack != null) {
@@ -262,7 +262,7 @@ public class UnionVector implements FieldVector {
   public FieldVector addVector(FieldVector v) {
     String name = v.getMinorType().name().toLowerCase();
     Preconditions.checkState(internalMap.getChild(name) == null, String.format("%s vector already exists", name));
-    final FieldVector newVector = internalMap.addOrGet(name, v.getMinorType(), v.getClass());
+    final FieldVector newVector = internalMap.addOrGet(name, v.getMinorType(), v.getClass(), v.getField().getDictionary());
     v.makeTransferPair(newVector).transfer();
     internalMap.putChild(name, newVector);
     if (callBack != null) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java
index d1e9abe..179f2ee 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java
@@ -81,6 +81,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
     } else {
       super.load(fieldNode, data);
     }
+    this.valueCount = fieldNode.getLength();
   }
 
   @Override
@@ -451,7 +452,6 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
 
     /**
      * set count bits to 1 in data starting at firstBitIndex
-     * @param data the buffer to set
      * @param firstBitIndex the index of the first bit to set
      * @param count the number of bits to set
      */

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
index b28433c..0fdbc48 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
@@ -19,11 +19,10 @@ package org.apache.arrow.vector;
 
 import java.util.List;
 
+import io.netty.buffer.ArrowBuf;
 import org.apache.arrow.vector.schema.ArrowFieldNode;
 import org.apache.arrow.vector.types.pojo.Field;
 
-import io.netty.buffer.ArrowBuf;
-
 /**
  * A vector corresponding to a Field in the schema
  * It has inner vectors backed by buffers (validity, offsets, data, ...)
@@ -61,5 +60,4 @@ public interface FieldVector extends ValueVector {
    * @return the inner vectors for this field as defined by the TypeLayout
    */
   List<BufferBacked> getFieldInnerVectors();
-
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
index 5c1176c..76de250 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
@@ -36,15 +36,14 @@ import io.netty.buffer.ArrowBuf;
  * Loads buffers into vectors
  */
 public class VectorLoader {
+
   private final VectorSchemaRoot root;
 
   /**
    * will create children in root based on schema
-   * @param schema the expected schema
    * @param root the root to add vectors to based on schema
    */
   public VectorLoader(VectorSchemaRoot root) {
-    super();
     this.root = root;
   }
 
@@ -57,18 +56,16 @@ public class VectorLoader {
     Iterator<ArrowBuf> buffers = recordBatch.getBuffers().iterator();
     Iterator<ArrowFieldNode> nodes = recordBatch.getNodes().iterator();
     List<Field> fields = root.getSchema().getFields();
-    for (int i = 0; i < fields.size(); ++i) {
-      Field field = fields.get(i);
+    for (Field field: fields) {
       FieldVector fieldVector = root.getVector(field.getName());
       loadBuffers(fieldVector, field, buffers, nodes);
     }
     root.setRowCount(recordBatch.getLength());
     if (nodes.hasNext() || buffers.hasNext()) {
-      throw new IllegalArgumentException("not all nodes and buffers where consumed. nodes: " + Iterators.toString(nodes) + " buffers: " + Iterators.toString(buffers));
+      throw new IllegalArgumentException("not all nodes and buffers were consumed. nodes: " + Iterators.toString(nodes) + " buffers: " + Iterators.toString(buffers));
     }
   }
 
-
   private void loadBuffers(FieldVector vector, Field field, Iterator<ArrowBuf> buffers, Iterator<ArrowFieldNode> nodes) {
     checkArgument(nodes.hasNext(),
         "no more field nodes for for field " + field + " and vector " + vector);
@@ -82,7 +79,7 @@ public class VectorLoader {
       vector.loadFieldBuffers(fieldNode, ownBuffers);
     } catch (RuntimeException e) {
       throw new IllegalArgumentException("Could not load buffers for field " +
-              field + ". error message: " + e.getMessage(), e);
+            field + ". error message: " + e.getMessage(), e);
     }
     List<Field> children = field.getChildren();
     if (children.size() > 0) {
@@ -96,4 +93,4 @@ public class VectorLoader {
     }
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
index 1cbe187..7e626fb 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
@@ -18,7 +18,6 @@
 package org.apache.arrow.vector;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +28,9 @@ import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
 
+/**
+ * Holder for a set of vectors to be loaded/unloaded
+ */
 public class VectorSchemaRoot implements AutoCloseable {
 
   private final Schema schema;
@@ -37,9 +39,17 @@ public class VectorSchemaRoot implements AutoCloseable {
   private final Map<String, FieldVector> fieldVectorsMap = new HashMap<>();
 
   public VectorSchemaRoot(FieldVector parent) {
-    this.schema = new Schema(parent.getField().getChildren());
-    this.rowCount = parent.getAccessor().getValueCount();
-    this.fieldVectors = parent.getChildrenFromFields();
+    this(parent.getField().getChildren(), parent.getChildrenFromFields(), parent.getAccessor().getValueCount());
+  }
+
+  public VectorSchemaRoot(List<Field> fields, List<FieldVector> fieldVectors, int rowCount) {
+    if (fields.size() != fieldVectors.size()) {
+      throw new IllegalArgumentException("Fields must match field vectors. Found " +
+          fieldVectors.size() + " vectors and " + fields.size() + " fields");
+    }
+    this.schema = new Schema(fields);
+    this.rowCount = rowCount;
+    this.fieldVectors = fieldVectors;
     for (int i = 0; i < schema.getFields().size(); ++i) {
       Field field = schema.getFields().get(i);
       FieldVector vector = fieldVectors.get(i);
@@ -47,21 +57,19 @@ public class VectorSchemaRoot implements AutoCloseable {
     }
   }
 
-  public VectorSchemaRoot(Schema schema, BufferAllocator allocator) {
-    super();
-    this.schema = schema;
+  public static VectorSchemaRoot create(Schema schema, BufferAllocator allocator) {
     List<FieldVector> fieldVectors = new ArrayList<>();
     for (Field field : schema.getFields()) {
       MinorType minorType = Types.getMinorTypeForArrowType(field.getType());
-      FieldVector vector = minorType.getNewVector(field.getName(), allocator, null);
+      FieldVector vector = minorType.getNewVector(field.getName(), allocator, field.getDictionary(), null);
       vector.initializeChildrenFromFields(field.getChildren());
       fieldVectors.add(vector);
-      fieldVectorsMap.put(field.getName(), vector);
     }
-    this.fieldVectors = Collections.unmodifiableList(fieldVectors);
-    if (this.fieldVectors.size() != schema.getFields().size()) {
-      throw new IllegalArgumentException("The root vector did not create the right number of children. found " + fieldVectors.size() + " expected " + schema.getFields().size());
+    if (fieldVectors.size() != schema.getFields().size()) {
+      throw new IllegalArgumentException("The root vector did not create the right number of children. found " +
+        fieldVectors.size() + " expected " + schema.getFields().size());
     }
+    return new VectorSchemaRoot(schema.getFields(), fieldVectors, 0);
   }
 
   public List<FieldVector> getFieldVectors() {

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
index 92d8cb0..8e9ff6d 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
@@ -20,42 +20,27 @@ package org.apache.arrow.vector;
 import java.util.ArrayList;
 import java.util.List;
 
+import io.netty.buffer.ArrowBuf;
 import org.apache.arrow.vector.ValueVector.Accessor;
 import org.apache.arrow.vector.schema.ArrowFieldNode;
 import org.apache.arrow.vector.schema.ArrowRecordBatch;
 import org.apache.arrow.vector.schema.ArrowVectorType;
-import org.apache.arrow.vector.types.pojo.Schema;
-
-import io.netty.buffer.ArrowBuf;
 
 public class VectorUnloader {
 
-  private final Schema schema;
-  private final int valueCount;
-  private final List<FieldVector> vectors;
-
-  public VectorUnloader(Schema schema, int valueCount, List<FieldVector> vectors) {
-    super();
-    this.schema = schema;
-    this.valueCount = valueCount;
-    this.vectors = vectors;
-  }
+  private final VectorSchemaRoot root;
 
   public VectorUnloader(VectorSchemaRoot root) {
-    this(root.getSchema(), root.getRowCount(), root.getFieldVectors());
-  }
-
-  public Schema getSchema() {
-    return schema;
+    this.root = root;
   }
 
   public ArrowRecordBatch getRecordBatch() {
     List<ArrowFieldNode> nodes = new ArrayList<>();
     List<ArrowBuf> buffers = new ArrayList<>();
-    for (FieldVector vector : vectors) {
+    for (FieldVector vector : root.getFieldVectors()) {
       appendNodes(vector, nodes, buffers);
     }
-    return new ArrowRecordBatch(valueCount, nodes, buffers);
+    return new ArrowRecordBatch(root.getRowCount(), nodes, buffers);
   }
 
   private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) {
@@ -74,4 +59,4 @@ public class VectorUnloader {
     }
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
index 2f68886..86a5e82 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
@@ -22,6 +22,7 @@ import org.apache.arrow.memory.OutOfMemoryException;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
 import org.apache.arrow.vector.util.CallBack;
 
 /**
@@ -85,7 +86,7 @@ public abstract class AbstractContainerVector implements ValueVector {
   public abstract int size();
 
   // add a new vector with the input MajorType or return the existing vector if we already added one with the same type
-  public abstract <T extends FieldVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, int... precisionScale);
+  public abstract <T extends FieldVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, DictionaryEncoding dictionary, int... precisionScale);
 
   // return the child vector with the input name
   public abstract <T extends FieldVector> T getChild(String name, Class<T> clazz);

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
index f030d16..baeeb07 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
@@ -26,6 +26,7 @@ import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
 import org.apache.arrow.vector.util.CallBack;
 import org.apache.arrow.vector.util.MapWithOrdinal;
 
@@ -110,7 +111,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
    * @return resultant {@link org.apache.arrow.vector.ValueVector}
    */
   @Override
-  public <T extends FieldVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, int... precisionScale) {
+  public <T extends FieldVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, DictionaryEncoding dictionary, int... precisionScale) {
     final ValueVector existing = getChild(name);
     boolean create = false;
     if (existing == null) {
@@ -122,7 +123,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
       create = true;
     }
     if (create) {
-      final T vector = clazz.cast(minorType.getNewVector(name, allocator, callBack, precisionScale));
+      final T vector = clazz.cast(minorType.getNewVector(name, allocator, dictionary, callBack, precisionScale));
       putChild(name, vector);
       if (callBack!=null) {
         callBack.doWork();
@@ -162,12 +163,12 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
     return typeify(v, clazz);
   }
 
-  protected ValueVector add(String name, MinorType minorType, int... precisionScale) {
+  protected ValueVector add(String name, MinorType minorType, DictionaryEncoding dictionary, int... precisionScale) {
     final ValueVector existing = getChild(name);
     if (existing != null) {
       throw new IllegalStateException(String.format("Vector already exists: Existing[%s], Requested[%s] ", existing.getClass().getSimpleName(), minorType));
     }
-    FieldVector vector = minorType.getNewVector(name, allocator, callBack, precisionScale);
+    FieldVector vector = minorType.getNewVector(name, allocator, dictionary, callBack, precisionScale);
     putChild(name, vector);
     if (callBack!=null) {
       callBack.doWork();

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
index 7424df4..eeb8f58 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
@@ -28,6 +28,7 @@ import org.apache.arrow.vector.UInt4Vector;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.ZeroVector;
 import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
 import org.apache.arrow.vector.util.SchemaChangeRuntimeException;
 
 import com.google.common.base.Preconditions;
@@ -150,10 +151,10 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements
     return vector == DEFAULT_DATA_VECTOR ? 0:1;
   }
 
-  public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(MinorType minorType) {
+  public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(MinorType minorType, DictionaryEncoding dictionary) {
     boolean created = false;
     if (vector instanceof ZeroVector) {
-      vector = minorType.getNewVector(DATA_VECTOR_NAME, allocator, null);
+      vector = minorType.getNewVector(DATA_VECTOR_NAME, allocator, dictionary, null);
       // returned vector must have the same field
       created = true;
     }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/DictionaryVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/DictionaryVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/DictionaryVector.java
deleted file mode 100644
index 84760ea..0000000
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/DictionaryVector.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*******************************************************************************
-
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.arrow.vector.complex;
-
-import io.netty.buffer.ArrowBuf;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.OutOfMemoryException;
-import org.apache.arrow.vector.NullableIntVector;
-import org.apache.arrow.vector.ValueVector;
-import org.apache.arrow.vector.complex.reader.FieldReader;
-import org.apache.arrow.vector.types.Dictionary;
-import org.apache.arrow.vector.types.Types.MinorType;
-import org.apache.arrow.vector.types.pojo.Field;
-import org.apache.arrow.vector.util.TransferPair;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-public class DictionaryVector implements ValueVector {
-
-  private ValueVector indices;
-  private Dictionary dictionary;
-
-  public DictionaryVector(ValueVector indices, Dictionary dictionary) {
-    this.indices = indices;
-    this.dictionary = dictionary;
-  }
-
-  /**
-   * Dictionary encodes a vector. The dictionary will be built using the values from the vector.
-   *
-   * @param vector vector to encode
-   * @return dictionary encoded vector
-   */
-  public static DictionaryVector encode(ValueVector vector) {
-    validateType(vector.getMinorType());
-    Map<Object, Integer> lookUps = new HashMap<>();
-    Map<Integer, Integer> transfers = new HashMap<>();
-
-    ValueVector.Accessor accessor = vector.getAccessor();
-    int count = accessor.getValueCount();
-
-    NullableIntVector indices = new NullableIntVector(vector.getField().getName(), vector.getAllocator());
-    indices.allocateNew(count);
-    NullableIntVector.Mutator mutator = indices.getMutator();
-
-    int nextIndex = 0;
-    for (int i = 0; i < count; i++) {
-      Object value = accessor.getObject(i);
-      if (value != null) { // if it's null leave it null
-        Integer index = lookUps.get(value);
-        if (index == null) {
-          index = nextIndex++;
-          lookUps.put(value, index);
-          transfers.put(i, index);
-        }
-        mutator.set(i, index);
-      }
-    }
-    mutator.setValueCount(count);
-
-    // copy the dictionary values into the dictionary vector
-    TransferPair dictionaryTransfer = vector.getTransferPair(vector.getAllocator());
-    ValueVector dictionaryVector = dictionaryTransfer.getTo();
-    dictionaryVector.allocateNewSafe();
-    for (Map.Entry<Integer, Integer> entry: transfers.entrySet()) {
-      dictionaryTransfer.copyValueSafe(entry.getKey(), entry.getValue());
-    }
-    dictionaryVector.getMutator().setValueCount(transfers.size());
-    Dictionary dictionary = new Dictionary(dictionaryVector, false);
-
-    return new DictionaryVector(indices, dictionary);
-  }
-
-  /**
-   * Dictionary encodes a vector with a provided dictionary. The dictionary must contain all values in the vector.
-   *
-   * @param vector vector to encode
-   * @param dictionary dictionary used for encoding
-   * @return dictionary encoded vector
-   */
-  public static DictionaryVector encode(ValueVector vector, Dictionary dictionary) {
-    validateType(vector.getMinorType());
-    // load dictionary values into a hashmap for lookup
-    ValueVector.Accessor dictionaryAccessor = dictionary.getDictionary().getAccessor();
-    Map<Object, Integer> lookUps = new HashMap<>(dictionaryAccessor.getValueCount());
-    for (int i = 0; i < dictionaryAccessor.getValueCount(); i++) {
-      // for primitive array types we need a wrapper that implements equals and hashcode appropriately
-      lookUps.put(dictionaryAccessor.getObject(i), i);
-    }
-
-    // vector to hold our indices (dictionary encoded values)
-    NullableIntVector indices = new NullableIntVector(vector.getField().getName(), vector.getAllocator());
-    NullableIntVector.Mutator mutator = indices.getMutator();
-
-    ValueVector.Accessor accessor = vector.getAccessor();
-    int count = accessor.getValueCount();
-
-    indices.allocateNew(count);
-
-    for (int i = 0; i < count; i++) {
-      Object value = accessor.getObject(i);
-      if (value != null) { // if it's null leave it null
-        // note: this may fail if value was not included in the dictionary
-        mutator.set(i, lookUps.get(value));
-      }
-    }
-    mutator.setValueCount(count);
-
-    return new DictionaryVector(indices, dictionary);
-  }
-
-  /**
-   * Decodes a dictionary encoded array using the provided dictionary.
-   *
-   * @param indices dictionary encoded values, must be int type
-   * @param dictionary dictionary used to decode the values
-   * @return vector with values restored from dictionary
-   */
-  public static ValueVector decode(ValueVector indices, Dictionary dictionary) {
-    ValueVector.Accessor accessor = indices.getAccessor();
-    int count = accessor.getValueCount();
-    ValueVector dictionaryVector = dictionary.getDictionary();
-    // copy the dictionary values into the decoded vector
-    TransferPair transfer = dictionaryVector.getTransferPair(indices.getAllocator());
-    transfer.getTo().allocateNewSafe();
-    for (int i = 0; i < count; i++) {
-      Object index = accessor.getObject(i);
-      if (index != null) {
-        transfer.copyValueSafe(((Number) index).intValue(), i);
-      }
-    }
-
-    ValueVector decoded = transfer.getTo();
-    decoded.getMutator().setValueCount(count);
-    return decoded;
-  }
-
-  private static void validateType(MinorType type) {
-    // byte arrays don't work as keys in our dictionary map - we could wrap them with something to
-    // implement equals and hashcode if we want that functionality
-    if (type == MinorType.VARBINARY || type == MinorType.LIST || type == MinorType.MAP || type == MinorType.UNION) {
-      throw new IllegalArgumentException("Dictionary encoding for complex types not implemented");
-    }
-  }
-
-  public ValueVector getIndexVector() { return indices; }
-
-  public ValueVector getDictionaryVector() { return dictionary.getDictionary(); }
-
-  public Dictionary getDictionary() { return dictionary; }
-
-  @Override
-  public MinorType getMinorType() { return indices.getMinorType(); }
-
-  @Override
-  public Field getField() { return indices.getField(); }
-
-  // note: dictionary vector is not closed, as it may be shared
-  @Override
-  public void close() { indices.close(); }
-
-  @Override
-  public void allocateNew() throws OutOfMemoryException { indices.allocateNew(); }
-
-  @Override
-  public boolean allocateNewSafe() { return indices.allocateNewSafe(); }
-
-  @Override
-  public BufferAllocator getAllocator() { return indices.getAllocator();  }
-
-  @Override
-  public void setInitialCapacity(int numRecords) { indices.setInitialCapacity(numRecords); }
-
-  @Override
-  public int getValueCapacity() { return indices.getValueCapacity(); }
-
-  @Override
-  public int getBufferSize() { return indices.getBufferSize(); }
-
-  @Override
-  public int getBufferSizeFor(int valueCount) { return indices.getBufferSizeFor(valueCount); }
-
-  @Override
-  public Iterator<ValueVector> iterator() {
-    return indices.iterator();
-  }
-
-  @Override
-  public void clear() { indices.clear(); }
-
-  @Override
-  public TransferPair getTransferPair(BufferAllocator allocator) { return indices.getTransferPair(allocator); }
-
-  @Override
-  public TransferPair getTransferPair(String ref, BufferAllocator allocator) { return indices.getTransferPair(ref, allocator); }
-
-  @Override
-  public TransferPair makeTransferPair(ValueVector target) { return indices.makeTransferPair(target); }
-
-  @Override
-  public Accessor getAccessor() { return indices.getAccessor(); }
-
-  @Override
-  public Mutator getMutator() { return indices.getMutator(); }
-
-  @Override
-  public FieldReader getReader() { return indices.getReader(); }
-
-  @Override
-  public ArrowBuf[] getBuffers(boolean clear) { return indices.getBuffers(clear); }
-}


[3/4] arrow git commit: ARROW-542: Adding dictionary encoding to FileWriter

Posted by we...@apache.org.
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
index 074b0aa..a12440e 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
@@ -24,6 +24,10 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ObjectArrays;
+
+import io.netty.buffer.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.OutOfMemoryException;
 import org.apache.arrow.vector.AddOrGetResult;
@@ -42,16 +46,12 @@ import org.apache.arrow.vector.complex.writer.FieldWriter;
 import org.apache.arrow.vector.schema.ArrowFieldNode;
 import org.apache.arrow.vector.types.Types;
 import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.util.CallBack;
 import org.apache.arrow.vector.util.JsonStringArrayList;
 import org.apache.arrow.vector.util.TransferPair;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ObjectArrays;
-
-import io.netty.buffer.ArrowBuf;
-
 public class ListVector extends BaseRepeatedValueVector implements FieldVector {
 
   final UInt4Vector offsets;
@@ -62,14 +62,16 @@ public class ListVector extends BaseRepeatedValueVector implements FieldVector {
   private UnionListWriter writer;
   private UnionListReader reader;
   private CallBack callBack;
+  private final DictionaryEncoding dictionary;
 
-  public ListVector(String name, BufferAllocator allocator, CallBack callBack) {
+  public ListVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack) {
     super(name, allocator);
     this.bits = new BitVector("$bits$", allocator);
     this.offsets = getOffsetVector();
     this.innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList(bits, offsets));
     this.writer = new UnionListWriter(this);
     this.reader = new UnionListReader(this);
+    this.dictionary = dictionary;
     this.callBack = callBack;
   }
 
@@ -80,7 +82,7 @@ public class ListVector extends BaseRepeatedValueVector implements FieldVector {
     }
     Field field = children.get(0);
     MinorType minorType = Types.getMinorTypeForArrowType(field.getType());
-    AddOrGetResult<FieldVector> addOrGetVector = addOrGetVector(minorType);
+    AddOrGetResult<FieldVector> addOrGetVector = addOrGetVector(minorType, field.getDictionary());
     if (!addOrGetVector.isCreated()) {
       throw new IllegalArgumentException("Child vector already existed: " + addOrGetVector.getVector());
     }
@@ -151,16 +153,16 @@ public class ListVector extends BaseRepeatedValueVector implements FieldVector {
     TransferPair pairs[] = new TransferPair[3];
 
     public TransferImpl(String name, BufferAllocator allocator) {
-      this(new ListVector(name, allocator, null));
+      this(new ListVector(name, allocator, dictionary, null));
     }
 
     public TransferImpl(ListVector to) {
       this.to = to;
-      to.addOrGetVector(vector.getMinorType());
+      to.addOrGetVector(vector.getMinorType(), vector.getField().getDictionary());
       pairs[0] = offsets.makeTransferPair(to.offsets);
       pairs[1] = bits.makeTransferPair(to.bits);
       if (to.getDataVector() instanceof ZeroVector) {
-        to.addOrGetVector(vector.getMinorType());
+        to.addOrGetVector(vector.getMinorType(), vector.getField().getDictionary());
       }
       pairs[2] = getDataVector().makeTransferPair(to.getDataVector());
     }
@@ -232,8 +234,8 @@ public class ListVector extends BaseRepeatedValueVector implements FieldVector {
     return success;
   }
 
-  public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(MinorType minorType) {
-    AddOrGetResult<T> result = super.addOrGetVector(minorType);
+  public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(MinorType minorType, DictionaryEncoding dictionary) {
+    AddOrGetResult<T> result = super.addOrGetVector(minorType, dictionary);
     reader = new UnionListReader(this);
     return result;
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java
index 31a1bb7..4d750ca 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java
@@ -160,7 +160,7 @@ public class MapVector extends AbstractMapVector {
         // (This is similar to what happens in ScanBatch where the children cannot be added till they are
         // read). To take care of this, we ensure that the hashCode of the MaterializedField does not
         // include the hashCode of the children but is based only on MaterializedField$key.
-        final FieldVector newVector = to.addOrGet(child, vector.getMinorType(), vector.getClass());
+        final FieldVector newVector = to.addOrGet(child, vector.getMinorType(), vector.getClass(), vector.getField().getDictionary());
         if (allocate && to.size() != preSize) {
           newVector.allocateNew();
         }
@@ -314,12 +314,11 @@ public class MapVector extends AbstractMapVector {
   public void initializeChildrenFromFields(List<Field> children) {
     for (Field field : children) {
       MinorType minorType = Types.getMinorTypeForArrowType(field.getType());
-      FieldVector vector = (FieldVector)this.add(field.getName(), minorType);
+      FieldVector vector = (FieldVector)this.add(field.getName(), minorType, field.getDictionary());
       vector.initializeChildrenFromFields(field.getChildren());
     }
   }
 
-
   public List<FieldVector> getChildrenFromFields() {
     return getChildren();
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java
index 5fa3530..bb1fdf8 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java
@@ -34,6 +34,7 @@ import org.apache.arrow.vector.complex.impl.NullableMapReaderImpl;
 import org.apache.arrow.vector.complex.reader.FieldReader;
 import org.apache.arrow.vector.holders.ComplexHolder;
 import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
 import org.apache.arrow.vector.util.CallBack;
 import org.apache.arrow.vector.util.TransferPair;
 
@@ -48,14 +49,16 @@ public class NullableMapVector extends MapVector implements FieldVector {
   protected final BitVector bits;
 
   private final List<BufferBacked> innerVectors;
+  private final DictionaryEncoding dictionary;
 
   private final Accessor accessor;
   private final Mutator mutator;
 
-  public NullableMapVector(String name, BufferAllocator allocator, CallBack callBack) {
+  public NullableMapVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack) {
     super(name, checkNotNull(allocator), callBack);
     this.bits = new BitVector("$bits$", allocator);
     this.innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList(bits));
+    this.dictionary = dictionary;
     this.accessor = new Accessor();
     this.mutator = new Mutator();
   }
@@ -83,7 +86,7 @@ public class NullableMapVector extends MapVector implements FieldVector {
 
   @Override
   public TransferPair getTransferPair(BufferAllocator allocator) {
-    return new NullableMapTransferPair(this, new NullableMapVector(name, allocator, callBack), false);
+    return new NullableMapTransferPair(this, new NullableMapVector(name, allocator, dictionary, callBack), false);
   }
 
   @Override
@@ -93,7 +96,7 @@ public class NullableMapVector extends MapVector implements FieldVector {
 
   @Override
   public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
-    return new NullableMapTransferPair(this, new NullableMapVector(ref, allocator, callBack), false);
+    return new NullableMapTransferPair(this, new NullableMapVector(ref, allocator, dictionary, callBack), false);
   }
 
   protected class NullableMapTransferPair extends MapTransferPair {

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java
index dbdd205..6d05316 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java
@@ -149,7 +149,8 @@ public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWri
     switch(mode){
 
     case INIT:
-      NullableMapVector map = container.addOrGet(name, MinorType.MAP, NullableMapVector.class);
+      // TODO allow dictionaries in complex types
+      NullableMapVector map = container.addOrGet(name, MinorType.MAP, NullableMapVector.class, null);
       mapRoot = nullableMapWriterFactory.build(map);
       mapRoot.setPosition(idx());
       mode = Mode.MAP;
@@ -180,7 +181,8 @@ public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWri
 
     case INIT:
       int vectorCount = container.size();
-      ListVector listVector = container.addOrGet(name, MinorType.LIST, ListVector.class);
+      // TODO allow dictionaries in complex types
+      ListVector listVector = container.addOrGet(name, MinorType.LIST, ListVector.class, null);
       if (container.size() > vectorCount) {
         listVector.allocateNew();
       }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java
index 1f7253b..e33319a 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java
@@ -125,7 +125,7 @@ public class PromotableWriter extends AbstractPromotableFieldWriter {
         // ???
         return null;
       }
-      ValueVector v = listVector.addOrGetVector(type).getVector();
+      ValueVector v = listVector.addOrGetVector(type, null).getVector();
       v.allocateNew();
       setWriter(v);
       writer.setPosition(position);
@@ -150,7 +150,8 @@ public class PromotableWriter extends AbstractPromotableFieldWriter {
     TransferPair tp = vector.getTransferPair(vector.getMinorType().name().toLowerCase(), vector.getAllocator());
     tp.transfer();
     if (parentContainer != null) {
-      unionVector = parentContainer.addOrGet(name, MinorType.UNION, UnionVector.class);
+      // TODO allow dictionaries in complex types
+      unionVector = parentContainer.addOrGet(name, MinorType.UNION, UnionVector.class, null);
       unionVector.allocateNew();
     } else if (listVector != null) {
       unionVector = listVector.promoteToUnion();

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/dictionary/Dictionary.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/dictionary/Dictionary.java b/java/vector/src/main/java/org/apache/arrow/vector/dictionary/Dictionary.java
new file mode 100644
index 0000000..0c1cadf
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/dictionary/Dictionary.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.arrow.vector.dictionary;
+
+import java.util.Objects;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
+
+public class Dictionary {
+
+  private final DictionaryEncoding encoding;
+  private final FieldVector dictionary;
+
+  public Dictionary(FieldVector dictionary, DictionaryEncoding encoding) {
+    this.dictionary = dictionary;
+    this.encoding = encoding;
+  }
+
+  public FieldVector getVector() {
+    return dictionary;
+  }
+
+  public DictionaryEncoding getEncoding() {
+    return encoding;
+  }
+
+  public ArrowType getVectorType() {
+    return dictionary.getField().getType();
+  }
+
+  @Override
+  public String toString() {
+    return "Dictionary " + encoding + " " + dictionary;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    Dictionary that = (Dictionary) o;
+    return Objects.equals(encoding, that.encoding) && Objects.equals(dictionary, that.dictionary);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(encoding, dictionary);
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryEncoder.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryEncoder.java b/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryEncoder.java
new file mode 100644
index 0000000..0666bc4
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryEncoder.java
@@ -0,0 +1,144 @@
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.arrow.vector.dictionary;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.util.TransferPair;
+
+public class DictionaryEncoder {
+
+  // TODO recursively examine fields?
+
+  /**
+   * Dictionary encodes a vector with a provided dictionary. The dictionary must contain all values in the vector.
+   *
+   * @param vector vector to encode
+   * @param dictionary dictionary used for encoding
+   * @return dictionary encoded vector
+   */
+  public static ValueVector encode(ValueVector vector, Dictionary dictionary) {
+    validateType(vector.getMinorType());
+    // load dictionary values into a hashmap for lookup
+    ValueVector.Accessor dictionaryAccessor = dictionary.getVector().getAccessor();
+    Map<Object, Integer> lookUps = new HashMap<>(dictionaryAccessor.getValueCount());
+    for (int i = 0; i < dictionaryAccessor.getValueCount(); i++) {
+      // for primitive array types we need a wrapper that implements equals and hashcode appropriately
+      lookUps.put(dictionaryAccessor.getObject(i), i);
+    }
+
+    Field valueField = vector.getField();
+    Field indexField = new Field(valueField.getName(), valueField.isNullable(),
+      dictionary.getEncoding().getIndexType(), dictionary.getEncoding(), null);
+
+    // vector to hold our indices (dictionary encoded values)
+    FieldVector indices = indexField.createVector(vector.getAllocator());
+    ValueVector.Mutator mutator = indices.getMutator();
+
+    // use reflection to pull out the set method
+    // TODO implement a common interface for int vectors
+    Method setter = null;
+    for (Class<?> c: ImmutableList.of(int.class, long.class)) {
+      try {
+        setter = mutator.getClass().getMethod("set", int.class, c);
+        break;
+      } catch(NoSuchMethodException e) {
+        // ignore
+      }
+    }
+    if (setter == null) {
+      throw new IllegalArgumentException("Dictionary encoding does not have a valid int type:" + indices.getClass());
+    }
+
+    ValueVector.Accessor accessor = vector.getAccessor();
+    int count = accessor.getValueCount();
+
+    indices.allocateNew();
+
+    try {
+      for (int i = 0; i < count; i++) {
+        Object value = accessor.getObject(i);
+        if (value != null) { // if it's null leave it null
+          // note: this may fail if value was not included in the dictionary
+          Object encoded = lookUps.get(value);
+          if (encoded == null) {
+            throw new IllegalArgumentException("Dictionary encoding not defined for value:" + value);
+          }
+          setter.invoke(mutator, i, encoded);
+        }
+      }
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException("IllegalAccessException invoking vector mutator set():", e);
+    } catch (InvocationTargetException e) {
+      throw new RuntimeException("InvocationTargetException invoking vector mutator set():", e.getCause());
+    }
+
+    mutator.setValueCount(count);
+
+    return indices;
+  }
+
+  /**
+   * Decodes a dictionary encoded array using the provided dictionary.
+   *
+   * @param indices dictionary encoded values, must be int type
+   * @param dictionary dictionary used to decode the values
+   * @return vector with values restored from dictionary
+   */
+  public static ValueVector decode(ValueVector indices, Dictionary dictionary) {
+    ValueVector.Accessor accessor = indices.getAccessor();
+    int count = accessor.getValueCount();
+    ValueVector dictionaryVector = dictionary.getVector();
+    int dictionaryCount = dictionaryVector.getAccessor().getValueCount();
+    // copy the dictionary values into the decoded vector
+    TransferPair transfer = dictionaryVector.getTransferPair(indices.getAllocator());
+    transfer.getTo().allocateNewSafe();
+    for (int i = 0; i < count; i++) {
+      Object index = accessor.getObject(i);
+      if (index != null) {
+        int indexAsInt = ((Number) index).intValue();
+        if (indexAsInt > dictionaryCount) {
+          throw new IllegalArgumentException("Provided dictionary does not contain value for index " + indexAsInt);
+        }
+        transfer.copyValueSafe(indexAsInt, i);
+      }
+    }
+    // TODO do we need to worry about the field?
+    ValueVector decoded = transfer.getTo();
+    decoded.getMutator().setValueCount(count);
+    return decoded;
+  }
+
+  private static void validateType(MinorType type) {
+    // byte arrays don't work as keys in our dictionary map - we could wrap them with something to
+    // implement equals and hashcode if we want that functionality
+    if (type == MinorType.VARBINARY || type == MinorType.LIST || type == MinorType.MAP || type == MinorType.UNION) {
+      throw new IllegalArgumentException("Dictionary encoding for complex types not implemented: type " + type);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java b/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java
new file mode 100644
index 0000000..63fde25
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.dictionary;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public interface DictionaryProvider {
+
+  public Dictionary lookup(long id);
+
+  public static class MapDictionaryProvider implements DictionaryProvider {
+
+    private final Map<Long, Dictionary> map;
+
+    public MapDictionaryProvider(Dictionary... dictionaries) {
+      this.map = new HashMap<>();
+      for (Dictionary dictionary: dictionaries) {
+        put(dictionary);
+      }
+    }
+
+    public void put(Dictionary dictionary) {
+      map.put(dictionary.getEncoding().getId(), dictionary);
+    }
+
+    @Override
+    public Dictionary lookup(long id) {
+      return map.get(id);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileReader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileReader.java
new file mode 100644
index 0000000..28440a1
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileReader.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.arrow.flatbuf.Footer;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.schema.ArrowDictionaryBatch;
+import org.apache.arrow.vector.schema.ArrowMessage;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.stream.MessageSerializer;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArrowFileReader extends ArrowReader<SeekableReadChannel> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ArrowFileReader.class);
+
+  private ArrowFooter footer;
+  private int currentDictionaryBatch = 0;
+  private int currentRecordBatch = 0;
+
+  public ArrowFileReader(SeekableByteChannel in, BufferAllocator allocator) {
+    super(new SeekableReadChannel(in), allocator);
+  }
+
+  public ArrowFileReader(SeekableReadChannel in, BufferAllocator allocator) {
+    super(in, allocator);
+  }
+
+  @Override
+  protected Schema readSchema(SeekableReadChannel in) throws IOException {
+    if (footer == null) {
+      if (in.size() <= (ArrowMagic.MAGIC_LENGTH * 2 + 4)) {
+        throw new InvalidArrowFileException("file too small: " + in.size());
+      }
+      ByteBuffer buffer = ByteBuffer.allocate(4 + ArrowMagic.MAGIC_LENGTH);
+      long footerLengthOffset = in.size() - buffer.remaining();
+      in.setPosition(footerLengthOffset);
+      in.readFully(buffer);
+      buffer.flip();
+      byte[] array = buffer.array();
+      if (!ArrowMagic.validateMagic(Arrays.copyOfRange(array, 4, array.length))) {
+        throw new InvalidArrowFileException("missing Magic number " + Arrays.toString(buffer.array()));
+      }
+      int footerLength = MessageSerializer.bytesToInt(array);
+      if (footerLength <= 0 || footerLength + ArrowMagic.MAGIC_LENGTH * 2 + 4 > in.size()) {
+        throw new InvalidArrowFileException("invalid footer length: " + footerLength);
+      }
+      long footerOffset = footerLengthOffset - footerLength;
+      LOGGER.debug(String.format("Footer starts at %d, length: %d", footerOffset, footerLength));
+      ByteBuffer footerBuffer = ByteBuffer.allocate(footerLength);
+      in.setPosition(footerOffset);
+      in.readFully(footerBuffer);
+      footerBuffer.flip();
+      Footer footerFB = Footer.getRootAsFooter(footerBuffer);
+      this.footer = new ArrowFooter(footerFB);
+    }
+    return footer.getSchema();
+  }
+
+  @Override
+  protected ArrowMessage readMessage(SeekableReadChannel in, BufferAllocator allocator) throws IOException {
+    if (currentDictionaryBatch < footer.getDictionaries().size()) {
+      ArrowBlock block = footer.getDictionaries().get(currentDictionaryBatch++);
+      return readDictionaryBatch(in, block, allocator);
+    } else if (currentRecordBatch < footer.getRecordBatches().size()) {
+      ArrowBlock block = footer.getRecordBatches().get(currentRecordBatch++);
+      return readRecordBatch(in, block, allocator);
+    } else {
+      return null;
+    }
+  }
+
+  public List<ArrowBlock> getDictionaryBlocks() throws IOException {
+    ensureInitialized();
+    return footer.getDictionaries();
+  }
+
+  public List<ArrowBlock> getRecordBlocks() throws IOException {
+    ensureInitialized();
+    return footer.getRecordBatches();
+  }
+
+  public void loadRecordBatch(ArrowBlock block) throws IOException {
+    ensureInitialized();
+    int blockIndex = footer.getRecordBatches().indexOf(block);
+    if (blockIndex == -1) {
+      throw new IllegalArgumentException("Arrow bock does not exist in record batches: " + block);
+    }
+    currentRecordBatch = blockIndex;
+    loadNextBatch();
+  }
+
+  private ArrowDictionaryBatch readDictionaryBatch(SeekableReadChannel in,
+                                                   ArrowBlock block,
+                                                   BufferAllocator allocator) throws IOException {
+    LOGGER.debug(String.format("DictionaryRecordBatch at %d, metadata: %d, body: %d",
+       block.getOffset(), block.getMetadataLength(), block.getBodyLength()));
+    in.setPosition(block.getOffset());
+    ArrowDictionaryBatch batch = MessageSerializer.deserializeDictionaryBatch(in, block, allocator);
+    if (batch == null) {
+      throw new IOException("Invalid file. No batch at offset: " + block.getOffset());
+    }
+    return batch;
+  }
+
+  private ArrowRecordBatch readRecordBatch(SeekableReadChannel in,
+                                           ArrowBlock block,
+                                           BufferAllocator allocator) throws IOException {
+    LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d",
+        block.getOffset(), block.getMetadataLength(),
+        block.getBodyLength()));
+    in.setPosition(block.getOffset());
+    ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(in, block, allocator);
+    if (batch == null) {
+      throw new IOException("Invalid file. No batch at offset: " + block.getOffset());
+    }
+    return batch;
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java
new file mode 100644
index 0000000..23d210a
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArrowFileWriter extends ArrowWriter {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ArrowFileWriter.class);
+
+  public ArrowFileWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
+    super(root, provider, out);
+  }
+
+  @Override
+  protected void startInternal(WriteChannel out) throws IOException {
+    ArrowMagic.writeMagic(out);
+  }
+
+  @Override
+  protected void endInternal(WriteChannel out,
+                             Schema schema,
+                             List<ArrowBlock> dictionaries,
+                             List<ArrowBlock> records) throws IOException {
+    long footerStart = out.getCurrentPosition();
+    out.write(new ArrowFooter(schema, dictionaries, records), false);
+    int footerLength = (int)(out.getCurrentPosition() - footerStart);
+    if (footerLength <= 0) {
+      throw new InvalidArrowFileException("invalid footer");
+    }
+    out.writeIntLittleEndian(footerLength);
+    LOGGER.debug(String.format("Footer starts at %d, length: %d", footerStart, footerLength));
+    ArrowMagic.writeMagic(out);
+    LOGGER.debug(String.format("magic written, now at %d", out.getCurrentPosition()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java
index 3890306..1c0008a 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java
@@ -38,7 +38,6 @@ public class ArrowFooter implements FBSerializable {
   private final List<ArrowBlock> recordBatches;
 
   public ArrowFooter(Schema schema, List<ArrowBlock> dictionaries, List<ArrowBlock> recordBatches) {
-    super();
     this.schema = schema;
     this.dictionaries = dictionaries;
     this.recordBatches = recordBatches;

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java
new file mode 100644
index 0000000..99ea96b
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+public class ArrowMagic {
+
+  private static final byte[] MAGIC = "ARROW1".getBytes(StandardCharsets.UTF_8);
+
+  public static final int MAGIC_LENGTH = MAGIC.length;
+
+  public static void writeMagic(WriteChannel out) throws IOException {
+    out.write(MAGIC);
+  }
+
+  public static boolean validateMagic(byte[] array) {
+    return Arrays.equals(MAGIC, array);
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
index 8f4f497..1646fbe 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
@@ -18,90 +18,188 @@
 package org.apache.arrow.vector.file;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SeekableByteChannel;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
 
-import org.apache.arrow.flatbuf.Footer;
 import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.dictionary.Dictionary;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.schema.ArrowDictionaryBatch;
+import org.apache.arrow.vector.schema.ArrowMessage;
+import org.apache.arrow.vector.schema.ArrowMessage.ArrowMessageVisitor;
 import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.stream.MessageSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ArrowReader implements AutoCloseable {
-  private static final Logger LOGGER = LoggerFactory.getLogger(ArrowReader.class);
-
-  public static final byte[] MAGIC = "ARROW1".getBytes();
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.ArrowType.Int;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
 
-  private final SeekableByteChannel in;
+public abstract class ArrowReader<T extends ReadChannel> implements DictionaryProvider, AutoCloseable {
 
+  private final T in;
   private final BufferAllocator allocator;
 
-  private ArrowFooter footer;
+  private VectorLoader loader;
+  private VectorSchemaRoot root;
+  private Map<Long, Dictionary> dictionaries;
 
-  public ArrowReader(SeekableByteChannel in, BufferAllocator allocator) {
-    super();
+  private boolean initialized = false;
+
+  protected ArrowReader(T in, BufferAllocator allocator) {
     this.in = in;
     this.allocator = allocator;
   }
 
-  private int readFully(ByteBuffer buffer) throws IOException {
-    int total = 0;
-    int n;
-    do {
-      n = in.read(buffer);
-      total += n;
-    } while (n >= 0 && buffer.remaining() > 0);
-    buffer.flip();
-    return total;
+  /**
+   * Returns the vector schema root. This will be loaded with new values on every call to loadNextBatch
+   *
+   * @return the vector schema root
+   * @throws IOException if reading of schema fails
+   */
+  public VectorSchemaRoot getVectorSchemaRoot() throws IOException {
+    ensureInitialized();
+    return root;
   }
 
-  public ArrowFooter readFooter() throws IOException {
-    if (footer == null) {
-      if (in.size() <= (MAGIC.length * 2 + 4)) {
-        throw new InvalidArrowFileException("file too small: " + in.size());
-      }
-      ByteBuffer buffer = ByteBuffer.allocate(4 + MAGIC.length);
-      long footerLengthOffset = in.size() - buffer.remaining();
-      in.position(footerLengthOffset);
-      readFully(buffer);
-      byte[] array = buffer.array();
-      if (!Arrays.equals(MAGIC, Arrays.copyOfRange(array, 4, array.length))) {
-        throw new InvalidArrowFileException("missing Magic number " + Arrays.toString(buffer.array()));
-      }
-      int footerLength = MessageSerializer.bytesToInt(array);
-      if (footerLength <= 0 || footerLength + MAGIC.length * 2 + 4 > in.size()) {
-        throw new InvalidArrowFileException("invalid footer length: " + footerLength);
-      }
-      long footerOffset = footerLengthOffset - footerLength;
-      LOGGER.debug(String.format("Footer starts at %d, length: %d", footerOffset, footerLength));
-      ByteBuffer footerBuffer = ByteBuffer.allocate(footerLength);
-      in.position(footerOffset);
-      readFully(footerBuffer);
-      Footer footerFB = Footer.getRootAsFooter(footerBuffer);
-      this.footer = new ArrowFooter(footerFB);
+  /**
+   * Returns any dictionaries
+   *
+   * @return dictionaries, if any
+   * @throws IOException if reading of schema fails
+   */
+  public Map<Long, Dictionary> getDictionaryVectors() throws IOException {
+    ensureInitialized();
+    return dictionaries;
+  }
+
+  @Override
+  public Dictionary lookup(long id) {
+    if (initialized) {
+      return dictionaries.get(id);
+    } else {
+      return null;
     }
-    return footer;
   }
 
-  // TODO: read dictionaries
-
-  public ArrowRecordBatch readRecordBatch(ArrowBlock block) throws IOException {
-    LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d",
-        block.getOffset(), block.getMetadataLength(),
-        block.getBodyLength()));
-    in.position(block.getOffset());
-    ArrowRecordBatch batch =  MessageSerializer.deserializeRecordBatch(
-        new ReadChannel(in, block.getOffset()), block, allocator);
-    if (batch == null) {
-      throw new IOException("Invalid file. No batch at offset: " + block.getOffset());
+  public void loadNextBatch() throws IOException {
+    ensureInitialized();
+    // read in all dictionary batches, then stop after our first record batch
+    ArrowMessageVisitor<Boolean> visitor = new ArrowMessageVisitor<Boolean>() {
+      @Override
+      public Boolean visit(ArrowDictionaryBatch message) {
+        try { load(message); } finally { message.close(); }
+        return true;
+      }
+      @Override
+      public Boolean visit(ArrowRecordBatch message) {
+        try { loader.load(message); } finally { message.close(); }
+        return false;
+      }
+    };
+    root.setRowCount(0);
+    ArrowMessage message = readMessage(in, allocator);
+    while (message != null && message.accepts(visitor)) {
+      message = readMessage(in, allocator);
     }
-    return batch;
   }
 
+  public long bytesRead() { return in.bytesRead(); }
+
   @Override
   public void close() throws IOException {
+    if (initialized) {
+      root.close();
+      for (Dictionary dictionary: dictionaries.values()) {
+        dictionary.getVector().close();
+      }
+    }
     in.close();
   }
+
+  protected abstract Schema readSchema(T in) throws IOException;
+
+  protected abstract ArrowMessage readMessage(T in, BufferAllocator allocator) throws IOException;
+
+  protected void ensureInitialized() throws IOException {
+    if (!initialized) {
+      initialize();
+      initialized = true;
+    }
+  }
+
+  /**
+   * Reads the schema and initializes the vectors
+   */
+  private void initialize() throws IOException {
+    Schema schema = readSchema(in);
+    List<Field> fields = new ArrayList<>();
+    List<FieldVector> vectors = new ArrayList<>();
+    Map<Long, Dictionary> dictionaries = new HashMap<>();
+
+    for (Field field: schema.getFields()) {
+      Field updated = toMemoryFormat(field, dictionaries);
+      fields.add(updated);
+      vectors.add(updated.createVector(allocator));
+    }
+
+    this.root = new VectorSchemaRoot(fields, vectors, 0);
+    this.loader = new VectorLoader(root);
+    this.dictionaries = Collections.unmodifiableMap(dictionaries);
+  }
+
+  // in the message format, fields have the dictionary type
+  // in the memory format, they have the index type
+  private Field toMemoryFormat(Field field, Map<Long, Dictionary> dictionaries) {
+    DictionaryEncoding encoding = field.getDictionary();
+    List<Field> children = field.getChildren();
+
+    if (encoding == null && children.isEmpty()) {
+      return field;
+    }
+
+    List<Field> updatedChildren = new ArrayList<>(children.size());
+    for (Field child: children) {
+      updatedChildren.add(toMemoryFormat(child, dictionaries));
+    }
+
+    ArrowType type;
+    if (encoding == null) {
+      type = field.getType();
+    } else {
+      // re-type the field for in-memory format
+      type = encoding.getIndexType();
+      if (type == null) {
+        type = new Int(32, true);
+      }
+      // get existing or create dictionary vector
+      if (!dictionaries.containsKey(encoding.getId())) {
+        // create a new dictionary vector for the values
+        Field dictionaryField = new Field(field.getName(), field.isNullable(), field.getType(), null, children);
+        FieldVector dictionaryVector = dictionaryField.createVector(allocator);
+        dictionaries.put(encoding.getId(), new Dictionary(dictionaryVector, encoding));
+      }
+    }
+
+    return new Field(field.getName(), field.isNullable(), type, encoding, updatedChildren);
+  }
+
+  private void load(ArrowDictionaryBatch dictionaryBatch) {
+    long id = dictionaryBatch.getDictionaryId();
+    Dictionary dictionary = dictionaries.get(id);
+    if (dictionary == null) {
+      throw new IllegalArgumentException("Dictionary ID " + id + " not defined in schema");
+    }
+    FieldVector vector = dictionary.getVector();
+    VectorSchemaRoot root = new VectorSchemaRoot(ImmutableList.of(vector.getField()), ImmutableList.of(vector), 0);
+    VectorLoader loader = new VectorLoader(root);
+    loader.load(dictionaryBatch.getDictionary());
+  }
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
index 24c667e..60a6afb 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,77 +21,172 @@ import java.io.IOException;
 import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import com.google.common.collect.ImmutableList;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.dictionary.Dictionary;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.schema.ArrowDictionaryBatch;
 import org.apache.arrow.vector.schema.ArrowRecordBatch;
 import org.apache.arrow.vector.stream.MessageSerializer;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
+import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ArrowWriter implements AutoCloseable {
+public abstract class ArrowWriter implements AutoCloseable {
+
   private static final Logger LOGGER = LoggerFactory.getLogger(ArrowWriter.class);
 
+  // schema with fields in message format, not memory format
+  private final Schema schema;
   private final WriteChannel out;
 
-  private final Schema schema;
+  private final VectorUnloader unloader;
+  private final List<ArrowDictionaryBatch> dictionaries;
+
+  private final List<ArrowBlock> dictionaryBlocks = new ArrayList<>();
+  private final List<ArrowBlock> recordBlocks = new ArrayList<>();
 
-  private final List<ArrowBlock> recordBatches = new ArrayList<>();
   private boolean started = false;
+  private boolean ended = false;
 
-  public ArrowWriter(WritableByteChannel out, Schema schema) {
+  /**
+   * Note: fields are not closed when the writer is closed
+   *
+   * @param root
+   * @param provider
+   * @param out
+   */
+  protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
+    this.unloader = new VectorUnloader(root);
     this.out = new WriteChannel(out);
-    this.schema = schema;
+
+    List<Field> fields = new ArrayList<>(root.getSchema().getFields().size());
+    Map<Long, ArrowDictionaryBatch> dictionaryBatches = new HashMap<>();
+
+    for (Field field: root.getSchema().getFields()) {
+      fields.add(toMessageFormat(field, provider, dictionaryBatches));
+    }
+
+    this.schema = new Schema(fields);
+    this.dictionaries = Collections.unmodifiableList(new ArrayList<>(dictionaryBatches.values()));
+  }
+
+  // in the message format, fields have the dictionary type
+  // in the memory format, they have the index type
+  private Field toMessageFormat(Field field, DictionaryProvider provider, Map<Long, ArrowDictionaryBatch> batches) {
+    DictionaryEncoding encoding = field.getDictionary();
+    List<Field> children = field.getChildren();
+
+    if (encoding == null && children.isEmpty()) {
+      return field;
+    }
+
+    List<Field> updatedChildren = new ArrayList<>(children.size());
+    for (Field child: children) {
+      updatedChildren.add(toMessageFormat(child, provider, batches));
+    }
+
+    ArrowType type;
+    if (encoding == null) {
+      type = field.getType();
+    } else {
+      long id = encoding.getId();
+      Dictionary dictionary = provider.lookup(id);
+      if (dictionary == null) {
+        throw new IllegalArgumentException("Could not find dictionary with ID " + id);
+      }
+      type = dictionary.getVectorType();
+
+      if (!batches.containsKey(id)) {
+        FieldVector vector = dictionary.getVector();
+        int count = vector.getAccessor().getValueCount();
+        VectorSchemaRoot root = new VectorSchemaRoot(ImmutableList.of(field), ImmutableList.of(vector), count);
+        VectorUnloader unloader = new VectorUnloader(root);
+        ArrowRecordBatch batch = unloader.getRecordBatch();
+        batches.put(id, new ArrowDictionaryBatch(id, batch));
+      }
+    }
+
+    return new Field(field.getName(), field.isNullable(), type, encoding, updatedChildren);
   }
 
-  private void start() throws IOException {
-    writeMagic();
-    MessageSerializer.serialize(out, schema);
+  public void start() throws IOException {
+    ensureStarted();
   }
 
-  // TODO: write dictionaries
+  public void writeBatch() throws IOException {
+    ensureStarted();
+    try (ArrowRecordBatch batch = unloader.getRecordBatch()) {
+      writeRecordBatch(batch);
+    }
+  }
 
-  public void writeRecordBatch(ArrowRecordBatch recordBatch) throws IOException {
-    checkStarted();
-    ArrowBlock batchDesc = MessageSerializer.serialize(out, recordBatch);
+  protected void writeRecordBatch(ArrowRecordBatch batch) throws IOException {
+    ArrowBlock block = MessageSerializer.serialize(out, batch);
     LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d",
-        batchDesc.getOffset(), batchDesc.getMetadataLength(), batchDesc.getBodyLength()));
+      block.getOffset(), block.getMetadataLength(), block.getBodyLength()));
+    recordBlocks.add(block);
+  }
 
-    // add metadata to footer
-    recordBatches.add(batchDesc);
+  public void end() throws IOException {
+    ensureStarted();
+    ensureEnded();
   }
 
-  private void checkStarted() throws IOException {
+  public long bytesWritten() { return out.getCurrentPosition(); }
+
+  private void ensureStarted() throws IOException {
     if (!started) {
       started = true;
-      start();
+      startInternal(out);
+      // write the schema - for file formats this is duplicated in the footer, but matches
+      // the streaming format
+      MessageSerializer.serialize(out, schema);
+      // write out any dictionaries
+      for (ArrowDictionaryBatch batch : dictionaries) {
+        try {
+          ArrowBlock block = MessageSerializer.serialize(out, batch);
+          LOGGER.debug(String.format("DictionaryRecordBatch at %d, metadata: %d, body: %d",
+            block.getOffset(), block.getMetadataLength(), block.getBodyLength()));
+          dictionaryBlocks.add(block);
+        } finally {
+          batch.close();
+        }
+      }
     }
   }
 
-  @Override
-  public void close() throws IOException {
-    try {
-      long footerStart = out.getCurrentPosition();
-      writeFooter();
-      int footerLength = (int)(out.getCurrentPosition() - footerStart);
-      if (footerLength <= 0 ) {
-        throw new InvalidArrowFileException("invalid footer");
-      }
-      out.writeIntLittleEndian(footerLength);
-      LOGGER.debug(String.format("Footer starts at %d, length: %d", footerStart, footerLength));
-      writeMagic();
-    } finally {
-      out.close();
+  private void ensureEnded() throws IOException {
+    if (!ended) {
+      ended = true;
+      endInternal(out, schema, dictionaryBlocks, recordBlocks);
     }
   }
 
-  private void writeMagic() throws IOException {
-    out.write(ArrowReader.MAGIC);
-    LOGGER.debug(String.format("magic written, now at %d", out.getCurrentPosition()));
-  }
+  protected abstract void startInternal(WriteChannel out) throws IOException;
+
+  protected abstract void endInternal(WriteChannel out,
+                                      Schema schema,
+                                      List<ArrowBlock> dictionaries,
+                                      List<ArrowBlock> records) throws IOException;
 
-  private void writeFooter() throws IOException {
-    // TODO: dictionaries
-    out.write(new ArrowFooter(schema, Collections.<ArrowBlock>emptyList(), recordBatches), false);
+  @Override
+  public void close() {
+    try {
+      end();
+      out.close();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java
index a9dc129..b062f38 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java
@@ -32,16 +32,9 @@ public class ReadChannel implements AutoCloseable {
 
   private ReadableByteChannel in;
   private long bytesRead = 0;
-  // The starting byte offset into 'in'.
-  private final long startByteOffset;
-
-  public ReadChannel(ReadableByteChannel in, long startByteOffset) {
-    this.in = in;
-    this.startByteOffset = startByteOffset;
-  }
 
   public ReadChannel(ReadableByteChannel in) {
-    this(in, 0);
+    this.in = in;
   }
 
   public long bytesRead() { return bytesRead; }
@@ -72,8 +65,6 @@ public class ReadChannel implements AutoCloseable {
     return n;
   }
 
-  public long getCurrentPositiion() { return startByteOffset + bytesRead; }
-
   @Override
   public void close() throws IOException {
     if (this.in != null) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/file/SeekableReadChannel.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/SeekableReadChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/file/SeekableReadChannel.java
new file mode 100644
index 0000000..914c3cb
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/SeekableReadChannel.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import java.io.IOException;
+import java.nio.channels.SeekableByteChannel;
+
+public class SeekableReadChannel extends ReadChannel {
+
+    private final SeekableByteChannel in;
+
+    public SeekableReadChannel(SeekableByteChannel in) {
+        super(in);
+        this.in = in;
+    }
+
+    public void setPosition(long position) throws IOException {
+        in.position(position);
+    }
+
+    public long size() throws IOException {
+        return in.size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/file/WriteChannel.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/WriteChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/file/WriteChannel.java
index d99c9a6..42104d1 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/WriteChannel.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/WriteChannel.java
@@ -21,13 +21,12 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 
-import org.apache.arrow.vector.schema.FBSerializable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.flatbuffers.FlatBufferBuilder;
 
 import io.netty.buffer.ArrowBuf;
+import org.apache.arrow.vector.schema.FBSerializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Wrapper around a WritableByteChannel that maintains the position as well adding

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java
index 24fdc18..bdb63b9 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java
@@ -88,10 +88,34 @@ public class JsonFileReader implements AutoCloseable {
     }
   }
 
+  public void read(VectorSchemaRoot root) throws IOException {
+    JsonToken t = parser.nextToken();
+    if (t == START_OBJECT) {
+      {
+        int count = readNextField("count", Integer.class);
+        root.setRowCount(count);
+        nextFieldIs("columns");
+        readToken(START_ARRAY);
+        {
+          for (Field field : schema.getFields()) {
+            FieldVector vector = root.getVector(field.getName());
+            readVector(field, vector);
+          }
+        }
+        readToken(END_ARRAY);
+      }
+      readToken(END_OBJECT);
+    } else if (t == END_ARRAY) {
+      root.setRowCount(0);
+    } else {
+      throw new IllegalArgumentException("Invalid token: " + t);
+    }
+  }
+
   public VectorSchemaRoot read() throws IOException {
     JsonToken t = parser.nextToken();
     if (t == START_OBJECT) {
-      VectorSchemaRoot recordBatch = new VectorSchemaRoot(schema, allocator);
+      VectorSchemaRoot recordBatch = VectorSchemaRoot.create(schema, allocator);
       {
         int count = readNextField("count", Integer.class);
         recordBatch.setRowCount(count);

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowDictionaryBatch.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowDictionaryBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowDictionaryBatch.java
new file mode 100644
index 0000000..901877b
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowDictionaryBatch.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.schema;
+
+import com.google.flatbuffers.FlatBufferBuilder;
+import org.apache.arrow.flatbuf.DictionaryBatch;
+
+public class ArrowDictionaryBatch implements ArrowMessage {
+
+    private final long dictionaryId;
+    private final ArrowRecordBatch dictionary;
+
+    public ArrowDictionaryBatch(long dictionaryId, ArrowRecordBatch dictionary) {
+        this.dictionaryId = dictionaryId;
+        this.dictionary = dictionary;
+    }
+
+    public long getDictionaryId() { return dictionaryId; }
+    public ArrowRecordBatch getDictionary() { return dictionary; }
+
+    @Override
+    public int writeTo(FlatBufferBuilder builder) {
+        int dataOffset = dictionary.writeTo(builder);
+        DictionaryBatch.startDictionaryBatch(builder);
+        DictionaryBatch.addId(builder, dictionaryId);
+        DictionaryBatch.addData(builder, dataOffset);
+        return DictionaryBatch.endDictionaryBatch(builder);
+    }
+
+    @Override
+    public int computeBodyLength() { return dictionary.computeBodyLength(); }
+
+    @Override
+    public <T> T accepts(ArrowMessageVisitor<T> visitor) { return visitor.visit(this); }
+
+    @Override
+    public String toString() {
+       return "ArrowDictionaryBatch [dictionaryId=" + dictionaryId + ", dictionary=" + dictionary + "]";
+    }
+
+    @Override
+    public void close() {
+      dictionary.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowMessage.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowMessage.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowMessage.java
new file mode 100644
index 0000000..d307428
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowMessage.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.schema;
+
+public interface ArrowMessage extends FBSerializable, AutoCloseable {
+
+    public int computeBodyLength();
+
+    public <T> T accepts(ArrowMessageVisitor<T> visitor);
+
+    public static interface ArrowMessageVisitor<T> {
+        public T visit(ArrowDictionaryBatch message);
+        public T visit(ArrowRecordBatch message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
index 40c2fbf..6ef514e 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
@@ -32,7 +32,8 @@ import com.google.flatbuffers.FlatBufferBuilder;
 
 import io.netty.buffer.ArrowBuf;
 
-public class ArrowRecordBatch implements FBSerializable, AutoCloseable {
+public class ArrowRecordBatch implements ArrowMessage {
+
   private static final Logger LOGGER = LoggerFactory.getLogger(ArrowRecordBatch.class);
 
   /** number of records */
@@ -113,9 +114,13 @@ public class ArrowRecordBatch implements FBSerializable, AutoCloseable {
     return RecordBatch.endRecordBatch(builder);
   }
 
+  @Override
+  public <T> T accepts(ArrowMessageVisitor<T> visitor) { return visitor.visit(this); }
+
   /**
    * releases the buffers
    */
+  @Override
   public void close() {
     if (!closed) {
       closed = true;
@@ -134,6 +139,7 @@ public class ArrowRecordBatch implements FBSerializable, AutoCloseable {
   /**
    * Computes the size of the serialized body for this recordBatch.
    */
+  @Override
   public int computeBodyLength() {
     int size = 0;
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamReader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamReader.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamReader.java
index f32966c..2deef37 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamReader.java
@@ -17,79 +17,43 @@
  */
 package org.apache.arrow.vector.stream;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-
 import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.file.ArrowReader;
 import org.apache.arrow.vector.file.ReadChannel;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.schema.ArrowMessage;
 import org.apache.arrow.vector.types.pojo.Schema;
 
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
 
 /**
  * This classes reads from an input stream and produces ArrowRecordBatches.
  */
-public class ArrowStreamReader implements AutoCloseable {
-  private ReadChannel in;
-  private final BufferAllocator allocator;
-  private Schema schema;
-
-  /**
-   * Constructs a streaming read, reading bytes from 'in'. Non-blocking.
-   */
-  public ArrowStreamReader(ReadableByteChannel in, BufferAllocator allocator) {
-    super();
-    this.in = new ReadChannel(in);
-    this.allocator = allocator;
-  }
-
-  public ArrowStreamReader(InputStream in, BufferAllocator allocator) {
-    this(Channels.newChannel(in), allocator);
-  }
-
-  /**
-   * Initializes the reader. Must be called before the other APIs. This is blocking.
-   */
-  public void init() throws IOException {
-    Preconditions.checkState(this.schema == null, "Cannot call init() more than once.");
-    this.schema = readSchema();
-  }
+public class ArrowStreamReader extends ArrowReader<ReadChannel> {
 
-  /**
-   * Returns the schema for all records in this stream.
-   */
-  public Schema getSchema () {
-    Preconditions.checkState(this.schema != null, "Must call init() first.");
-    return schema;
-  }
-
-  public long bytesRead() { return in.bytesRead(); }
+    /**
+    * Constructs a streaming read, reading bytes from 'in'. Non-blocking.
+    */
+    public ArrowStreamReader(ReadableByteChannel in, BufferAllocator allocator) {
+        super(new ReadChannel(in), allocator);
+    }
 
-  /**
-   * Reads and returns the next ArrowRecordBatch. Returns null if this is the end
-   * of stream.
-   */
-  public ArrowRecordBatch nextRecordBatch() throws IOException {
-    Preconditions.checkState(this.in != null, "Cannot call after close()");
-    Preconditions.checkState(this.schema != null, "Must call init() first.");
-    return MessageSerializer.deserializeRecordBatch(in, allocator);
-  }
+    public ArrowStreamReader(InputStream in, BufferAllocator allocator) {
+        this(Channels.newChannel(in), allocator);
+    }
 
-  @Override
-  public void close() throws IOException {
-    if (this.in != null) {
-      in.close();
-      in = null;
+    /**
+     * Reads the schema message from the beginning of the stream.
+     */
+    @Override
+    protected Schema readSchema(ReadChannel in) throws IOException {
+        return MessageSerializer.deserializeSchema(in);
     }
-  }
 
-  /**
-   * Reads the schema message from the beginning of the stream.
-   */
-  private Schema readSchema() throws IOException {
-    return MessageSerializer.deserializeSchema(in);
-  }
+    @Override
+    protected ArrowMessage readMessage(ReadChannel in, BufferAllocator allocator) throws IOException {
+        return MessageSerializer.deserializeMessageBatch(in, allocator);
+    }
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java
index 60dc586..ea29cd9 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java
@@ -17,63 +17,40 @@
  */
 package org.apache.arrow.vector.stream;
 
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.file.ArrowBlock;
+import org.apache.arrow.vector.file.ArrowWriter;
+import org.apache.arrow.vector.file.WriteChannel;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
+import java.util.List;
 
-import org.apache.arrow.vector.file.WriteChannel;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.types.pojo.Schema;
-
-public class ArrowStreamWriter implements AutoCloseable {
-  private final WriteChannel out;
-  private final Schema schema;
-  private boolean headerSent = false;
+public class ArrowStreamWriter extends ArrowWriter {
 
-  /**
-   * Creates the stream writer. non-blocking.
-   * totalBatches can be set if the writer knows beforehand. Can be -1 if unknown.
-   */
-  public ArrowStreamWriter(WritableByteChannel out, Schema schema) {
-    this.out = new WriteChannel(out);
-    this.schema = schema;
-  }
-
-  public ArrowStreamWriter(OutputStream out, Schema schema)
-      throws IOException {
-    this(Channels.newChannel(out), schema);
-  }
-
-  public long bytesWritten() { return out.getCurrentPosition(); }
-
-  public void writeRecordBatch(ArrowRecordBatch batch) throws IOException {
-    // Send the header if we have not yet.
-    checkAndSendHeader();
-    MessageSerializer.serialize(out, batch);
-  }
+    public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, OutputStream out) {
+       this(root, provider, Channels.newChannel(out));
+    }
 
-  /**
-   * End the stream. This is not required and this object can simply be closed.
-   */
-  public void end() throws IOException {
-    checkAndSendHeader();
-    out.writeIntLittleEndian(0);
-  }
+    public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
+       super(root, provider, out);
+    }
 
-  @Override
-  public void close() throws IOException {
-    // The header might not have been sent if this is an empty stream. Send it even in
-    // this case so readers see a valid empty stream.
-    checkAndSendHeader();
-    out.close();
-  }
+    @Override
+    protected void startInternal(WriteChannel out) throws IOException {}
 
-  private void checkAndSendHeader() throws IOException {
-    if (!headerSent) {
-      MessageSerializer.serialize(out, schema);
-      headerSent = true;
+    @Override
+    protected void endInternal(WriteChannel out,
+                               Schema schema,
+                               List<ArrowBlock> dictionaries,
+                               List<ArrowBlock> records) throws IOException {
+       out.writeIntLittleEndian(0);
     }
-  }
 }
-

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
index 92df250..92a6c0c 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
@@ -22,7 +22,11 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.google.flatbuffers.FlatBufferBuilder;
+
+import io.netty.buffer.ArrowBuf;
 import org.apache.arrow.flatbuf.Buffer;
+import org.apache.arrow.flatbuf.DictionaryBatch;
 import org.apache.arrow.flatbuf.FieldNode;
 import org.apache.arrow.flatbuf.Message;
 import org.apache.arrow.flatbuf.MessageHeader;
@@ -33,14 +37,12 @@ import org.apache.arrow.vector.file.ArrowBlock;
 import org.apache.arrow.vector.file.ReadChannel;
 import org.apache.arrow.vector.file.WriteChannel;
 import org.apache.arrow.vector.schema.ArrowBuffer;
+import org.apache.arrow.vector.schema.ArrowDictionaryBatch;
 import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.schema.ArrowMessage;
 import org.apache.arrow.vector.schema.ArrowRecordBatch;
 import org.apache.arrow.vector.types.pojo.Schema;
 
-import com.google.flatbuffers.FlatBufferBuilder;
-
-import io.netty.buffer.ArrowBuf;
-
 /**
  * Utility class for serializing Messages. Messages are all serialized a similar way.
  * 1. 4 byte little endian message header prefix
@@ -81,35 +83,39 @@ public class MessageSerializer {
    * Deserializes a schema object. Format is from serialize().
    */
   public static Schema deserializeSchema(ReadChannel in) throws IOException {
-    Message message = deserializeMessage(in, MessageHeader.Schema);
+    Message message = deserializeMessage(in);
     if (message == null) {
       throw new IOException("Unexpected end of input. Missing schema.");
     }
+    if (message.headerType() != MessageHeader.Schema) {
+      throw new IOException("Expected schema but header was " + message.headerType());
+    }
 
     return Schema.convertSchema((org.apache.arrow.flatbuf.Schema)
         message.header(new org.apache.arrow.flatbuf.Schema()));
   }
 
+
   /**
    * Serializes an ArrowRecordBatch. Returns the offset and length of the written batch.
    */
   public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch)
-      throws IOException {
+          throws IOException {
+
     long start = out.getCurrentPosition();
     int bodyLength = batch.computeBodyLength();
 
     FlatBufferBuilder builder = new FlatBufferBuilder();
     int batchOffset = batch.writeTo(builder);
 
-    ByteBuffer serializedMessage = serializeMessage(builder, MessageHeader.RecordBatch,
-        batchOffset, bodyLength);
+    ByteBuffer serializedMessage = serializeMessage(builder, MessageHeader.RecordBatch, batchOffset, bodyLength);
 
     int metadataLength = serializedMessage.remaining();
 
-    // Add extra padding bytes so that length prefix + metadata is a multiple
-    // of 8 after alignment
-    if ((start + metadataLength + 4) % 8 != 0) {
-        metadataLength += 8 - (start + metadataLength + 4) % 8;
+    // calculate alignment bytes so that metadata length points to the correct location after alignment
+    int padding = (int)((start + metadataLength + 4) % 8);
+    if (padding != 0) {
+        metadataLength += (8 - padding);
     }
 
     out.writeIntLittleEndian(metadataLength);
@@ -118,6 +124,13 @@ public class MessageSerializer {
     // Align the output to 8 byte boundary.
     out.align();
 
+    long bufferLength = writeBatchBuffers(out, batch);
+
+    // Metadata size in the Block account for the size prefix
+    return new ArrowBlock(start, metadataLength + 4, bufferLength);
+  }
+
+  private static long writeBatchBuffers(WriteChannel out, ArrowRecordBatch batch) throws IOException {
     long bufferStart = out.getCurrentPosition();
     List<ArrowBuf> buffers = batch.getBuffers();
     List<ArrowBuffer> buffersLayout = batch.getBuffersLayout();
@@ -135,22 +148,14 @@ public class MessageSerializer {
             " != " + startPosition + layout.getSize());
       }
     }
-    // Metadata size in the Block account for the size prefix
-    return new ArrowBlock(start, metadataLength + 4, out.getCurrentPosition() - bufferStart);
+    return out.getCurrentPosition() - bufferStart;
   }
 
   /**
    * Deserializes a RecordBatch
    */
-  public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in,
-      BufferAllocator alloc) throws IOException {
-    Message message = deserializeMessage(in, MessageHeader.RecordBatch);
-    if (message == null) return null;
-
-    if (message.bodyLength() > Integer.MAX_VALUE) {
-      throw new IOException("Cannot currently deserialize record batches over 2GB");
-    }
-
+  private static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, Message message, BufferAllocator alloc)
+      throws IOException {
     RecordBatch recordBatchFB = (RecordBatch) message.header(new RecordBatch());
 
     int bodyLength = (int) message.bodyLength();
@@ -191,9 +196,7 @@ public class MessageSerializer {
     // Now read the body
     final ArrowBuf body = buffer.slice(block.getMetadataLength(),
         (int) totalLen - block.getMetadataLength());
-    ArrowRecordBatch result = deserializeRecordBatch(recordBatchFB, body);
-
-    return result;
+    return deserializeRecordBatch(recordBatchFB, body);
   }
 
   // Deserializes a record batch given the Flatbuffer metadata and in-memory body
@@ -219,6 +222,106 @@ public class MessageSerializer {
   }
 
   /**
+   * Serializes a dictionary ArrowRecordBatch. Returns the offset and length of the written batch.
+   */
+  public static ArrowBlock serialize(WriteChannel out, ArrowDictionaryBatch batch) throws IOException {
+    long start = out.getCurrentPosition();
+    int bodyLength = batch.computeBodyLength();
+
+    FlatBufferBuilder builder = new FlatBufferBuilder();
+    int batchOffset = batch.writeTo(builder);
+
+    ByteBuffer serializedMessage = serializeMessage(builder, MessageHeader.DictionaryBatch, batchOffset, bodyLength);
+
+    int metadataLength = serializedMessage.remaining();
+
+    // Add extra padding bytes so that length prefix + metadata is a multiple
+    // of 8 after alignment
+    if ((start + metadataLength + 4) % 8 != 0) {
+      metadataLength += 8 - (start + metadataLength + 4) % 8;
+    }
+
+    out.writeIntLittleEndian(metadataLength);
+    out.write(serializedMessage);
+
+    // Align the output to 8 byte boundary.
+    out.align();
+
+    // write the embedded record batch
+    long bufferLength = writeBatchBuffers(out, batch.getDictionary());
+
+    // Metadata size in the Block account for the size prefix
+    return new ArrowBlock(start, metadataLength + 4, bufferLength + 8);
+  }
+
+  /**
+   * Deserializes a DictionaryBatch
+   */
+  private static ArrowDictionaryBatch deserializeDictionaryBatch(ReadChannel in,
+                                                                 Message message,
+                                                                 BufferAllocator alloc) throws IOException {
+    DictionaryBatch dictionaryBatchFB = (DictionaryBatch) message.header(new DictionaryBatch());
+
+    int bodyLength = (int) message.bodyLength();
+
+    // Now read the record batch body
+    ArrowBuf body = alloc.buffer(bodyLength);
+    if (in.readFully(body, bodyLength) != bodyLength) {
+      throw new IOException("Unexpected end of input trying to read batch.");
+    }
+    ArrowRecordBatch recordBatch = deserializeRecordBatch(dictionaryBatchFB.data(), body);
+    return new ArrowDictionaryBatch(dictionaryBatchFB.id(), recordBatch);
+  }
+
+  /**
+   * Deserializes a DictionaryBatch knowing the size of the entire message up front. This
+   * minimizes the number of reads to the underlying stream.
+   */
+  public static ArrowDictionaryBatch deserializeDictionaryBatch(ReadChannel in,
+                                                                ArrowBlock block,
+                                                                BufferAllocator alloc) throws IOException {
+    // Metadata length contains integer prefix plus byte padding
+    long totalLen = block.getMetadataLength() + block.getBodyLength();
+
+    if (totalLen > Integer.MAX_VALUE) {
+      throw new IOException("Cannot currently deserialize record batches over 2GB");
+    }
+
+    ArrowBuf buffer = alloc.buffer((int) totalLen);
+    if (in.readFully(buffer, (int) totalLen) != totalLen) {
+      throw new IOException("Unexpected end of input trying to read batch.");
+    }
+
+    ArrowBuf metadataBuffer = buffer.slice(4, block.getMetadataLength() - 4);
+
+    Message messageFB =
+        Message.getRootAsMessage(metadataBuffer.nioBuffer().asReadOnlyBuffer());
+
+    DictionaryBatch dictionaryBatchFB = (DictionaryBatch) messageFB.header(new DictionaryBatch());
+
+    // Now read the body
+    final ArrowBuf body = buffer.slice(block.getMetadataLength(),
+                                       (int) totalLen - block.getMetadataLength());
+    ArrowRecordBatch recordBatch = deserializeRecordBatch(dictionaryBatchFB.data(), body);
+    return new ArrowDictionaryBatch(dictionaryBatchFB.id(), recordBatch);
+  }
+
+  public static ArrowMessage deserializeMessageBatch(ReadChannel in, BufferAllocator alloc) throws IOException {
+    Message message = deserializeMessage(in);
+    if (message == null) {
+      return null;
+    } else if (message.bodyLength() > Integer.MAX_VALUE) {
+      throw new IOException("Cannot currently deserialize record batches over 2GB");
+    }
+
+    switch (message.headerType()) {
+      case MessageHeader.RecordBatch: return deserializeRecordBatch(in, message, alloc);
+      case MessageHeader.DictionaryBatch: return deserializeDictionaryBatch(in, message, alloc);
+      default: throw new IOException("Unexpected message header type " + message.headerType());
+    }
+  }
+
+  /**
    * Serializes a message header.
    */
   private static ByteBuffer serializeMessage(FlatBufferBuilder builder, byte headerType,
@@ -232,7 +335,7 @@ public class MessageSerializer {
     return builder.dataBuffer();
   }
 
-  private static Message deserializeMessage(ReadChannel in, byte headerType) throws IOException {
+  private static Message deserializeMessage(ReadChannel in) throws IOException {
     // Read the message size. There is an i32 little endian prefix.
     ByteBuffer buffer = ByteBuffer.allocate(4);
     if (in.readFully(buffer) != 4) return null;
@@ -246,11 +349,6 @@ public class MessageSerializer {
     }
     buffer.rewind();
 
-    Message message = Message.getRootAsMessage(buffer);
-    if (message.headerType() != headerType) {
-      throw new IOException("Invalid message: expecting " + headerType +
-          ". Message contained: " + message.headerType());
-    }
-    return message;
+    return Message.getRootAsMessage(buffer);
   }
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/types/Dictionary.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/Dictionary.java b/java/vector/src/main/java/org/apache/arrow/vector/types/Dictionary.java
deleted file mode 100644
index fbe1345..0000000
--- a/java/vector/src/main/java/org/apache/arrow/vector/types/Dictionary.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*******************************************************************************
-
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.arrow.vector.types;
-
-import org.apache.arrow.vector.ValueVector;
-
-public class Dictionary {
-
-    private ValueVector dictionary;
-    private boolean ordered;
-
-    public Dictionary(ValueVector dictionary, boolean ordered) {
-        this.dictionary = dictionary;
-        this.ordered = ordered;
-    }
-
-    public ValueVector getDictionary() {
-        return dictionary;
-    }
-
-    public boolean isOrdered() {
-        return ordered;
-    }
-}


[2/4] arrow git commit: ARROW-542: Adding dictionary encoding to FileWriter

Posted by we...@apache.org.
http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java b/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java
index ab539d5..8f2d042 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java
@@ -33,10 +33,10 @@ import org.apache.arrow.vector.NullableIntVector;
 import org.apache.arrow.vector.NullableIntervalDayVector;
 import org.apache.arrow.vector.NullableIntervalYearVector;
 import org.apache.arrow.vector.NullableSmallIntVector;
-import org.apache.arrow.vector.NullableTimeStampSecVector;
-import org.apache.arrow.vector.NullableTimeStampMilliVector;
 import org.apache.arrow.vector.NullableTimeStampMicroVector;
+import org.apache.arrow.vector.NullableTimeStampMilliVector;
 import org.apache.arrow.vector.NullableTimeStampNanoVector;
+import org.apache.arrow.vector.NullableTimeStampSecVector;
 import org.apache.arrow.vector.NullableTimeVector;
 import org.apache.arrow.vector.NullableTinyIntVector;
 import org.apache.arrow.vector.NullableUInt1Vector;
@@ -61,10 +61,10 @@ import org.apache.arrow.vector.complex.impl.IntervalDayWriterImpl;
 import org.apache.arrow.vector.complex.impl.IntervalYearWriterImpl;
 import org.apache.arrow.vector.complex.impl.NullableMapWriter;
 import org.apache.arrow.vector.complex.impl.SmallIntWriterImpl;
-import org.apache.arrow.vector.complex.impl.TimeStampSecWriterImpl;
-import org.apache.arrow.vector.complex.impl.TimeStampMilliWriterImpl;
 import org.apache.arrow.vector.complex.impl.TimeStampMicroWriterImpl;
+import org.apache.arrow.vector.complex.impl.TimeStampMilliWriterImpl;
 import org.apache.arrow.vector.complex.impl.TimeStampNanoWriterImpl;
+import org.apache.arrow.vector.complex.impl.TimeStampSecWriterImpl;
 import org.apache.arrow.vector.complex.impl.TimeWriterImpl;
 import org.apache.arrow.vector.complex.impl.TinyIntWriterImpl;
 import org.apache.arrow.vector.complex.impl.UInt1WriterImpl;
@@ -92,6 +92,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType.Time;
 import org.apache.arrow.vector.types.pojo.ArrowType.Timestamp;
 import org.apache.arrow.vector.types.pojo.ArrowType.Union;
 import org.apache.arrow.vector.types.pojo.ArrowType.Utf8;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.util.CallBack;
 
@@ -129,7 +130,7 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
         return ZeroVector.INSTANCE;
       }
 
@@ -145,8 +146,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-         return new NullableMapVector(name, allocator, callBack);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+         return new NullableMapVector(name, allocator, dictionary, callBack);
       }
 
       @Override
@@ -161,8 +162,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableTinyIntVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableTinyIntVector(name, allocator, dictionary);
       }
 
       @Override
@@ -177,8 +178,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableSmallIntVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableSmallIntVector(name, allocator, dictionary);
       }
 
       @Override
@@ -193,8 +194,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableIntVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableIntVector(name, allocator, dictionary);
       }
 
       @Override
@@ -209,8 +210,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableBigIntVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableBigIntVector(name, allocator, dictionary);
       }
 
       @Override
@@ -225,8 +226,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableDateVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableDateVector(name, allocator, dictionary);
       }
 
       @Override
@@ -241,8 +242,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableTimeVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableTimeVector(name, allocator, dictionary);
       }
 
       @Override
@@ -258,8 +259,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableTimeStampSecVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableTimeStampSecVector(name, allocator, dictionary);
       }
 
       @Override
@@ -275,8 +276,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableTimeStampMilliVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableTimeStampMilliVector(name, allocator, dictionary);
       }
 
       @Override
@@ -292,8 +293,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableTimeStampMicroVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableTimeStampMicroVector(name, allocator, dictionary);
       }
 
       @Override
@@ -309,8 +310,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableTimeStampNanoVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableTimeStampNanoVector(name, allocator, dictionary);
       }
 
       @Override
@@ -325,8 +326,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableIntervalDayVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableIntervalDayVector(name, allocator, dictionary);
       }
 
       @Override
@@ -341,8 +342,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableIntervalDayVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableIntervalDayVector(name, allocator, dictionary);
       }
 
       @Override
@@ -358,8 +359,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableFloat4Vector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableFloat4Vector(name, allocator, dictionary);
       }
 
       @Override
@@ -375,8 +376,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableFloat8Vector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableFloat8Vector(name, allocator, dictionary);
       }
 
       @Override
@@ -391,8 +392,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableBitVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableBitVector(name, allocator, dictionary);
       }
 
       @Override
@@ -407,8 +408,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableVarCharVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableVarCharVector(name, allocator, dictionary);
       }
 
       @Override
@@ -423,8 +424,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableVarBinaryVector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableVarBinaryVector(name, allocator, dictionary);
       }
 
       @Override
@@ -443,8 +444,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableDecimalVector(name, allocator, precisionScale[0], precisionScale[1]);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableDecimalVector(name, allocator, dictionary, precisionScale[0], precisionScale[1]);
       }
 
       @Override
@@ -459,8 +460,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableUInt1Vector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableUInt1Vector(name, allocator, dictionary);
       }
 
       @Override
@@ -475,8 +476,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableUInt2Vector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableUInt2Vector(name, allocator, dictionary);
       }
 
       @Override
@@ -491,8 +492,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableUInt4Vector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableUInt4Vector(name, allocator, dictionary);
       }
 
       @Override
@@ -507,8 +508,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new NullableUInt8Vector(name, allocator);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new NullableUInt8Vector(name, allocator, dictionary);
       }
 
       @Override
@@ -523,8 +524,8 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
-        return new ListVector(name, allocator, callBack);
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        return new ListVector(name, allocator, dictionary, callBack);
       }
 
       @Override
@@ -539,7 +540,10 @@ public class Types {
       }
 
       @Override
-      public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+      public FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale) {
+        if (dictionary != null) {
+          throw new UnsupportedOperationException("Dictionary encoding not supported for complex types");
+        }
         return new UnionVector(name, allocator, callBack);
       }
 
@@ -561,7 +565,7 @@ public class Types {
 
     public abstract Field getField();
 
-    public abstract FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale);
+    public abstract FieldVector getNewVector(String name, BufferAllocator allocator, DictionaryEncoding dictionary, CallBack callBack, int... precisionScale);
 
     public abstract FieldWriter getNewFieldWriter(ValueVector vector);
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/DictionaryEncoding.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/DictionaryEncoding.java b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/DictionaryEncoding.java
new file mode 100644
index 0000000..6d35cde
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/DictionaryEncoding.java
@@ -0,0 +1,51 @@
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.arrow.vector.types.pojo;
+
+import org.apache.arrow.vector.types.pojo.ArrowType.Int;
+
+public class DictionaryEncoding {
+
+  private final long id;
+  private final boolean ordered;
+  private final Int indexType;
+
+  public DictionaryEncoding(long id, boolean ordered, Int indexType) {
+    this.id = id;
+    this.ordered = ordered;
+    this.indexType = indexType == null ? new Int(32, true) : indexType;
+  }
+
+  public long getId() {
+    return id;
+  }
+
+  public boolean isOrdered() {
+    return ordered;
+  }
+
+  public Int getIndexType() {
+    return indexType;
+  }
+
+  @Override
+  public String toString() {
+    return "DictionaryEncoding[id=" + id + ",ordered=" + ordered + ",indexType=" + indexType + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java
index f9b79ce..bbbd559 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java
@@ -24,23 +24,27 @@ import static org.apache.arrow.vector.types.pojo.ArrowType.getTypeForField;
 import java.util.List;
 import java.util.Objects;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import org.apache.arrow.flatbuf.DictionaryEncoding;
-import org.apache.arrow.vector.schema.TypeLayout;
-import org.apache.arrow.vector.schema.VectorLayout;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.flatbuffers.FlatBufferBuilder;
 
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.schema.TypeLayout;
+import org.apache.arrow.vector.schema.VectorLayout;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.ArrowType.Int;
+
 public class Field {
   private final String name;
   private final boolean nullable;
   private final ArrowType type;
-  private final Long dictionary;
+  private final DictionaryEncoding dictionary;
   private final List<Field> children;
   private final TypeLayout typeLayout;
 
@@ -49,7 +53,7 @@ public class Field {
       @JsonProperty("name") String name,
       @JsonProperty("nullable") boolean nullable,
       @JsonProperty("type") ArrowType type,
-      @JsonProperty("dictionary") Long dictionary,
+      @JsonProperty("dictionary") DictionaryEncoding dictionary,
       @JsonProperty("children") List<Field> children,
       @JsonProperty("typeLayout") TypeLayout typeLayout) {
     this.name = name;
@@ -68,18 +72,30 @@ public class Field {
     this(name, nullable, type, null, children, TypeLayout.getTypeLayout(checkNotNull(type)));
   }
 
-  public Field(String name, boolean nullable, ArrowType type, Long dictionary, List<Field> children) {
+  public Field(String name, boolean nullable, ArrowType type, DictionaryEncoding dictionary, List<Field> children) {
     this(name, nullable, type, dictionary, children, TypeLayout.getTypeLayout(checkNotNull(type)));
   }
 
+  public FieldVector createVector(BufferAllocator allocator) {
+    MinorType minorType = Types.getMinorTypeForArrowType(type);
+    FieldVector vector = minorType.getNewVector(name, allocator, dictionary, null);
+    vector.initializeChildrenFromFields(children);
+    return vector;
+  }
+
   public static Field convertField(org.apache.arrow.flatbuf.Field field) {
     String name = field.name();
     boolean nullable = field.nullable();
     ArrowType type = getTypeForField(field);
-    DictionaryEncoding dictionaryEncoding = field.dictionary();
-    Long dictionary = null;
-    if (dictionaryEncoding != null) {
-      dictionary = dictionaryEncoding.id();
+    DictionaryEncoding dictionary = null;
+    org.apache.arrow.flatbuf.DictionaryEncoding dictionaryFB = field.dictionary();
+    if (dictionaryFB != null) {
+      Int indexType = null;
+      org.apache.arrow.flatbuf.Int indexTypeFB = dictionaryFB.indexType();
+      if (indexTypeFB != null) {
+        indexType = new Int(indexTypeFB.bitWidth(), indexTypeFB.isSigned());
+      }
+      dictionary = new DictionaryEncoding(dictionaryFB.id(), dictionaryFB.isOrdered(), indexType);
     }
     ImmutableList.Builder<org.apache.arrow.vector.schema.VectorLayout> layout = ImmutableList.builder();
     for (int i = 0; i < field.layoutLength(); ++i) {
@@ -105,8 +121,11 @@ public class Field {
     int typeOffset = type.getType(builder);
     int dictionaryOffset = -1;
     if (dictionary != null) {
-      builder.addLong(dictionary);
-      dictionaryOffset = builder.offset();
+      // TODO encode dictionary type - currently type is only signed 32 bit int (default null)
+      org.apache.arrow.flatbuf.DictionaryEncoding.startDictionaryEncoding(builder);
+      org.apache.arrow.flatbuf.DictionaryEncoding.addId(builder, dictionary.getId());
+      org.apache.arrow.flatbuf.DictionaryEncoding.addIsOrdered(builder, dictionary.isOrdered());
+      dictionaryOffset = org.apache.arrow.flatbuf.DictionaryEncoding.endDictionaryEncoding(builder);
     }
     int[] childrenData = new int[children.size()];
     for (int i = 0; i < children.size(); i++) {
@@ -126,11 +145,11 @@ public class Field {
     org.apache.arrow.flatbuf.Field.addNullable(builder, nullable);
     org.apache.arrow.flatbuf.Field.addTypeType(builder, type.getTypeID().getFlatbufID());
     org.apache.arrow.flatbuf.Field.addType(builder, typeOffset);
+    org.apache.arrow.flatbuf.Field.addChildren(builder, childrenOffset);
+    org.apache.arrow.flatbuf.Field.addLayout(builder, layoutOffset);
     if (dictionary != null) {
       org.apache.arrow.flatbuf.Field.addDictionary(builder, dictionaryOffset);
     }
-    org.apache.arrow.flatbuf.Field.addChildren(builder, childrenOffset);
-    org.apache.arrow.flatbuf.Field.addLayout(builder, layoutOffset);
     return org.apache.arrow.flatbuf.Field.endField(builder);
   }
 
@@ -147,7 +166,7 @@ public class Field {
   }
 
   @JsonInclude(Include.NON_NULL)
-  public Long getDictionary() { return dictionary; }
+  public DictionaryEncoding getDictionary() { return dictionary; }
 
   public List<Field> getChildren() {
     return children;
@@ -168,8 +187,8 @@ public class Field {
             Objects.equals(this.type, that.type) &&
            Objects.equals(this.dictionary, that.dictionary) &&
             (Objects.equals(this.children, that.children) ||
-                    (this.children == null && that.children.size() == 0) ||
-                    (this.children.size() == 0 && that.children == null));
+                    (this.children == null || this.children.size() == 0) &&
+                    (that.children == null || that.children.size() == 0));
   }
 
   @Override
@@ -180,7 +199,7 @@ public class Field {
     }
     sb.append(type);
     if (dictionary != null) {
-      sb.append("[dictionary: ").append(dictionary).append("]");
+      sb.append("[dictionary: ").append(dictionary.getId()).append("]");
     }
     if (!children.isEmpty()) {
       sb.append("<").append(Joiner.on(", ").join(children)).append(">");

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java
index cca35e4..20f4aa8 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java
@@ -44,7 +44,7 @@ public class TestDecimalVector {
   @Test
   public void test() {
     BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
-    NullableDecimalVector decimalVector = new NullableDecimalVector("decimal", allocator, 10, scale);
+    NullableDecimalVector decimalVector = new NullableDecimalVector("decimal", allocator, null, 10, scale);
     decimalVector.allocateNew();
     BigDecimal[] values = new BigDecimal[intValues.length];
     for (int i = 0; i < intValues.length; i++) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/TestDictionaryVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestDictionaryVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestDictionaryVector.java
index 962950a..e3087ef 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestDictionaryVector.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestDictionaryVector.java
@@ -18,16 +18,16 @@
 package org.apache.arrow.vector;
 
 import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.complex.DictionaryVector;
-import org.apache.arrow.vector.types.Dictionary;
+import org.apache.arrow.vector.dictionary.DictionaryEncoder;
+import org.apache.arrow.vector.dictionary.Dictionary;
 import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.nio.charset.StandardCharsets;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
 public class TestDictionaryVector {
@@ -49,65 +49,10 @@ public class TestDictionaryVector {
   }
 
   @Test
-  public void testEncodeStringsWithGeneratedDictionary() {
+  public void testEncodeStrings() {
     // Create a new value vector
-    try (final NullableVarCharVector vector = (NullableVarCharVector) MinorType.VARCHAR.getNewVector("foo", allocator, null)) {
-      final NullableVarCharVector.Mutator m = vector.getMutator();
-      vector.allocateNew(512, 5);
-
-      // set some values
-      m.setSafe(0, zero, 0, zero.length);
-      m.setSafe(1, one, 0, one.length);
-      m.setSafe(2, one, 0, one.length);
-      m.setSafe(3, two, 0, two.length);
-      m.setSafe(4, zero, 0, zero.length);
-      m.setValueCount(5);
-
-      DictionaryVector encoded = DictionaryVector.encode(vector);
-
-      try {
-        // verify values in the dictionary
-        ValueVector dictionary = encoded.getDictionaryVector();
-        assertEquals(vector.getClass(), dictionary.getClass());
-
-        NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) dictionary).getAccessor();
-        assertEquals(3, dictionaryAccessor.getValueCount());
-        assertArrayEquals(zero, dictionaryAccessor.get(0));
-        assertArrayEquals(one, dictionaryAccessor.get(1));
-        assertArrayEquals(two, dictionaryAccessor.get(2));
-
-        // verify indices
-        ValueVector indices = encoded.getIndexVector();
-        assertEquals(NullableIntVector.class, indices.getClass());
-
-        NullableIntVector.Accessor indexAccessor = ((NullableIntVector) indices).getAccessor();
-        assertEquals(5, indexAccessor.getValueCount());
-        assertEquals(0, indexAccessor.get(0));
-        assertEquals(1, indexAccessor.get(1));
-        assertEquals(1, indexAccessor.get(2));
-        assertEquals(2, indexAccessor.get(3));
-        assertEquals(0, indexAccessor.get(4));
-
-        // now run through the decoder and verify we get the original back
-        try (ValueVector decoded = DictionaryVector.decode(indices, encoded.getDictionary())) {
-          assertEquals(vector.getClass(), decoded.getClass());
-          assertEquals(vector.getAccessor().getValueCount(), decoded.getAccessor().getValueCount());
-          for (int i = 0; i < 5; i++) {
-            assertEquals(vector.getAccessor().getObject(i), decoded.getAccessor().getObject(i));
-          }
-        }
-      } finally {
-        encoded.getDictionaryVector().close();
-        encoded.getIndexVector().close();
-      }
-    }
-  }
-
-  @Test
-  public void testEncodeStringsWithProvidedDictionary() {
-    // Create a new value vector
-    try (final NullableVarCharVector vector = (NullableVarCharVector) MinorType.VARCHAR.getNewVector("foo", allocator, null);
-         final NullableVarCharVector dictionary = (NullableVarCharVector) MinorType.VARCHAR.getNewVector("dict", allocator, null)) {
+    try (final NullableVarCharVector vector = (NullableVarCharVector) MinorType.VARCHAR.getNewVector("foo", allocator, null, null);
+         final NullableVarCharVector dictionaryVector = (NullableVarCharVector) MinorType.VARCHAR.getNewVector("dict", allocator, null, null)) {
       final NullableVarCharVector.Mutator m = vector.getMutator();
       vector.allocateNew(512, 5);
 
@@ -120,19 +65,20 @@ public class TestDictionaryVector {
       m.setValueCount(5);
 
       // set some dictionary values
-      final NullableVarCharVector.Mutator m2 = dictionary.getMutator();
-      dictionary.allocateNew(512, 3);
+      final NullableVarCharVector.Mutator m2 = dictionaryVector.getMutator();
+      dictionaryVector.allocateNew(512, 3);
       m2.setSafe(0, zero, 0, zero.length);
       m2.setSafe(1, one, 0, one.length);
       m2.setSafe(2, two, 0, two.length);
       m2.setValueCount(3);
 
-      try(final DictionaryVector encoded = DictionaryVector.encode(vector, new Dictionary(dictionary, false))) {
+      Dictionary dictionary = new Dictionary(dictionaryVector, new DictionaryEncoding(1L, false, null));
+
+      try(final ValueVector encoded = (FieldVector) DictionaryEncoder.encode(vector, dictionary)) {
         // verify indices
-        ValueVector indices = encoded.getIndexVector();
-        assertEquals(NullableIntVector.class, indices.getClass());
+        assertEquals(NullableIntVector.class, encoded.getClass());
 
-        NullableIntVector.Accessor indexAccessor = ((NullableIntVector) indices).getAccessor();
+        NullableIntVector.Accessor indexAccessor = ((NullableIntVector) encoded).getAccessor();
         assertEquals(5, indexAccessor.getValueCount());
         assertEquals(0, indexAccessor.get(0));
         assertEquals(1, indexAccessor.get(1));
@@ -141,7 +87,7 @@ public class TestDictionaryVector {
         assertEquals(0, indexAccessor.get(4));
 
         // now run through the decoder and verify we get the original back
-        try (ValueVector decoded = DictionaryVector.decode(indices, encoded.getDictionary())) {
+        try (ValueVector decoded = DictionaryEncoder.decode(encoded, dictionary)) {
           assertEquals(vector.getClass(), decoded.getClass());
           assertEquals(vector.getAccessor().getValueCount(), decoded.getAccessor().getValueCount());
           for (int i = 0; i < 5; i++) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java
index 1f0baae..18d93b6 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java
@@ -42,8 +42,8 @@ public class TestListVector {
 
   @Test
   public void testCopyFrom() throws Exception {
-    try (ListVector inVector = new ListVector("input", allocator, null);
-         ListVector outVector = new ListVector("output", allocator, null)) {
+    try (ListVector inVector = new ListVector("input", allocator, null, null);
+         ListVector outVector = new ListVector("output", allocator, null, null)) {
       UnionListWriter writer = inVector.getWriter();
       writer.allocate();
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java
index 774b59e..6917638 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java
@@ -86,7 +86,7 @@ public class TestValueVector {
   public void testNullableVarLen2() {
 
     // Create a new value vector for 1024 integers.
-    try (final NullableVarCharVector vector = new NullableVarCharVector(EMPTY_SCHEMA_PATH, allocator)) {
+    try (final NullableVarCharVector vector = new NullableVarCharVector(EMPTY_SCHEMA_PATH, allocator, null)) {
       final NullableVarCharVector.Mutator m = vector.getMutator();
       vector.allocateNew(1024 * 10, 1024);
 
@@ -116,7 +116,7 @@ public class TestValueVector {
   public void testNullableFixedType() {
 
     // Create a new value vector for 1024 integers.
-    try (final NullableUInt4Vector vector = new NullableUInt4Vector(EMPTY_SCHEMA_PATH, allocator)) {
+    try (final NullableUInt4Vector vector = new NullableUInt4Vector(EMPTY_SCHEMA_PATH, allocator, null)) {
       final NullableUInt4Vector.Mutator m = vector.getMutator();
       vector.allocateNew(1024);
 
@@ -186,7 +186,7 @@ public class TestValueVector {
   @Test
   public void testNullableFloat() {
     // Create a new value vector for 1024 integers
-    try (final NullableFloat4Vector vector = (NullableFloat4Vector) MinorType.FLOAT4.getNewVector(EMPTY_SCHEMA_PATH, allocator, null)) {
+    try (final NullableFloat4Vector vector = (NullableFloat4Vector) MinorType.FLOAT4.getNewVector(EMPTY_SCHEMA_PATH, allocator, null, null)) {
       final NullableFloat4Vector.Mutator m = vector.getMutator();
       vector.allocateNew(1024);
 
@@ -233,7 +233,7 @@ public class TestValueVector {
   @Test
   public void testNullableInt() {
     // Create a new value vector for 1024 integers
-    try (final NullableIntVector vector = (NullableIntVector) MinorType.INT.getNewVector(EMPTY_SCHEMA_PATH, allocator, null)) {
+    try (final NullableIntVector vector = (NullableIntVector) MinorType.INT.getNewVector(EMPTY_SCHEMA_PATH, allocator, null, null)) {
       final NullableIntVector.Mutator m = vector.getMutator();
       vector.allocateNew(1024);
 
@@ -403,7 +403,7 @@ public class TestValueVector {
   @Test
   public void testReAllocNullableFixedWidthVector() {
     // Create a new value vector for 1024 integers
-    try (final NullableFloat4Vector vector = (NullableFloat4Vector) MinorType.FLOAT4.getNewVector(EMPTY_SCHEMA_PATH, allocator, null)) {
+    try (final NullableFloat4Vector vector = (NullableFloat4Vector) MinorType.FLOAT4.getNewVector(EMPTY_SCHEMA_PATH, allocator, null, null)) {
       final NullableFloat4Vector.Mutator m = vector.getMutator();
       vector.allocateNew(1024);
 
@@ -436,7 +436,7 @@ public class TestValueVector {
   @Test
   public void testReAllocNullableVariableWidthVector() {
     // Create a new value vector for 1024 integers
-    try (final NullableVarCharVector vector = (NullableVarCharVector) MinorType.VARCHAR.getNewVector(EMPTY_SCHEMA_PATH, allocator, null)) {
+    try (final NullableVarCharVector vector = (NullableVarCharVector) MinorType.VARCHAR.getNewVector(EMPTY_SCHEMA_PATH, allocator, null, null)) {
       final NullableVarCharVector.Mutator m = vector.getMutator();
       vector.allocateNew();
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
index 79c9d50..372bcf0 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import io.netty.buffer.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.complex.MapVector;
@@ -46,8 +47,6 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Test;
 
-import io.netty.buffer.ArrowBuf;
-
 public class TestVectorUnloadLoad {
 
   static final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
@@ -81,8 +80,8 @@ public class TestVectorUnloadLoad {
       try (
           ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
           BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-          VectorSchemaRoot newRoot = new VectorSchemaRoot(schema, finalVectorsAllocator);
-          ) {
+          VectorSchemaRoot newRoot = VectorSchemaRoot.create(schema, finalVectorsAllocator);
+      ) {
 
         // load it
         VectorLoader vectorLoader = new VectorLoader(newRoot);
@@ -131,8 +130,8 @@ public class TestVectorUnloadLoad {
       try (
           ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
           BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-          VectorSchemaRoot newRoot = new VectorSchemaRoot(schema, finalVectorsAllocator);
-          ) {
+          VectorSchemaRoot newRoot = VectorSchemaRoot.create(schema, finalVectorsAllocator);
+      ) {
         List<ArrowBuf> oldBuffers = recordBatch.getBuffers();
         List<ArrowBuf> newBuffers = new ArrayList<>();
         for (ArrowBuf oldBuffer : oldBuffers) {
@@ -185,7 +184,7 @@ public class TestVectorUnloadLoad {
     Schema schema = new Schema(asList(
         new Field("intDefined", true, new ArrowType.Int(32, true), Collections.<Field>emptyList()),
         new Field("intNull", true, new ArrowType.Int(32, true), Collections.<Field>emptyList())
-        ));
+                                     ));
     int count = 10;
     ArrowBuf validity = allocator.buffer(10).slice(0, 0);
     ArrowBuf[] values = new ArrowBuf[2];
@@ -200,8 +199,8 @@ public class TestVectorUnloadLoad {
     try (
         ArrowRecordBatch recordBatch = new ArrowRecordBatch(count, asList(new ArrowFieldNode(count, 0), new ArrowFieldNode(count, count)), asList(validity, values[0], validity, values[1]));
         BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-        VectorSchemaRoot newRoot = new VectorSchemaRoot(schema, finalVectorsAllocator);
-        ) {
+        VectorSchemaRoot newRoot = VectorSchemaRoot.create(schema, finalVectorsAllocator);
+    ) {
 
       // load it
       VectorLoader vectorLoader = new VectorLoader(newRoot);
@@ -244,11 +243,12 @@ public class TestVectorUnloadLoad {
     Schema schema = new Schema(root.getField().getChildren());
     int valueCount = root.getAccessor().getValueCount();
     List<FieldVector> fields = root.getChildrenFromFields();
-    return new VectorUnloader(schema, valueCount, fields);
+    VectorSchemaRoot vsr = new VectorSchemaRoot(schema.getFields(), fields, valueCount);
+    return new VectorUnloader(vsr);
   }
 
   @AfterClass
   public static void afterClass() {
     allocator.close();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java
index 58312b3..2b49d8e 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java
@@ -53,7 +53,7 @@ public class TestPromotableWriter {
   public void testPromoteToUnion() throws Exception {
 
     try (final MapVector container = new MapVector(EMPTY_SCHEMA_PATH, allocator, null);
-         final NullableMapVector v = container.addOrGet("test", MinorType.MAP, NullableMapVector.class);
+         final NullableMapVector v = container.addOrGet("test", MinorType.MAP, NullableMapVector.class, null);
          final PromotableWriter writer = new PromotableWriter(v, container)) {
 
       container.allocateNew();

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java
index 7a2d416..a8a2d51 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java
@@ -181,7 +181,7 @@ public class TestComplexWriter {
 
   @Test
   public void listScalarType() {
-    ListVector listVector = new ListVector("list", allocator, null);
+    ListVector listVector = new ListVector("list", allocator, null, null);
     listVector.allocateNew();
     UnionListWriter listWriter = new UnionListWriter(listVector);
     for (int i = 0; i < COUNT; i++) {
@@ -204,7 +204,7 @@ public class TestComplexWriter {
 
   @Test
   public void listScalarTypeNullable() {
-    ListVector listVector = new ListVector("list", allocator, null);
+    ListVector listVector = new ListVector("list", allocator, null, null);
     listVector.allocateNew();
     UnionListWriter listWriter = new UnionListWriter(listVector);
     for (int i = 0; i < COUNT; i++) {
@@ -233,7 +233,7 @@ public class TestComplexWriter {
 
   @Test
   public void listMapType() {
-    ListVector listVector = new ListVector("list", allocator, null);
+    ListVector listVector = new ListVector("list", allocator, null, null);
     listVector.allocateNew();
     UnionListWriter listWriter = new UnionListWriter(listVector);
     MapWriter mapWriter = listWriter.map();
@@ -261,7 +261,7 @@ public class TestComplexWriter {
 
   @Test
   public void listListType() {
-    try (ListVector listVector = new ListVector("list", allocator, null)) {
+    try (ListVector listVector = new ListVector("list", allocator, null, null)) {
       listVector.allocateNew();
       UnionListWriter listWriter = new UnionListWriter(listVector);
       for (int i = 0; i < COUNT; i++) {
@@ -286,7 +286,7 @@ public class TestComplexWriter {
    */
   @Test
   public void listListType2() {
-    try (ListVector listVector = new ListVector("list", allocator, null)) {
+    try (ListVector listVector = new ListVector("list", allocator, null, null)) {
       listVector.allocateNew();
       UnionListWriter listWriter = new UnionListWriter(listVector);
       ListWriter innerListWriter = listWriter.list();
@@ -324,7 +324,7 @@ public class TestComplexWriter {
 
   @Test
   public void unionListListType() {
-    try (ListVector listVector = new ListVector("list", allocator, null)) {
+    try (ListVector listVector = new ListVector("list", allocator, null, null)) {
       listVector.allocateNew();
       UnionListWriter listWriter = new UnionListWriter(listVector);
       for (int i = 0; i < COUNT; i++) {
@@ -353,7 +353,7 @@ public class TestComplexWriter {
    */
   @Test
   public void unionListListType2() {
-    try (ListVector listVector = new ListVector("list", allocator, null)) {
+    try (ListVector listVector = new ListVector("list", allocator, null, null)) {
       listVector.allocateNew();
       UnionListWriter listWriter = new UnionListWriter(listVector);
       ListWriter innerListWriter = listWriter.list();

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
index a83a283..75e5d2d 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
@@ -17,31 +17,44 @@
  */
 package org.apache.arrow.vector.file;
 
-import static org.apache.arrow.vector.TestVectorUnloadLoad.newVectorUnloader;
-import static org.junit.Assert.assertTrue;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
+
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.NullableTinyIntVector;
+import org.apache.arrow.vector.NullableVarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.complex.MapVector;
 import org.apache.arrow.vector.complex.NullableMapVector;
+import org.apache.arrow.vector.complex.impl.UnionListWriter;
+import org.apache.arrow.vector.dictionary.Dictionary;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider;
+import org.apache.arrow.vector.dictionary.DictionaryEncoder;
 import org.apache.arrow.vector.schema.ArrowBuffer;
+import org.apache.arrow.vector.schema.ArrowMessage;
 import org.apache.arrow.vector.schema.ArrowRecordBatch;
 import org.apache.arrow.vector.stream.ArrowStreamReader;
 import org.apache.arrow.vector.stream.ArrowStreamWriter;
+import org.apache.arrow.vector.stream.MessageSerializerTest;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.ArrowType.Int;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
+import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.Text;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -68,7 +81,7 @@ public class TestArrowFile extends BaseFileTest {
     int count = COUNT;
     try (
         BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
-        NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null)) {
+        NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null, null)) {
       writeComplexData(count, parent);
       FieldVector root = parent.getChild("root");
       validateComplexContent(count, new VectorSchemaRoot(root));
@@ -83,71 +96,63 @@ public class TestArrowFile extends BaseFileTest {
     int count = COUNT;
 
     // write
-    try (
-        BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
-        MapVector parent = new MapVector("parent", originalVectorAllocator, null)) {
+    try (BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
+         MapVector parent = new MapVector("parent", originalVectorAllocator, null)) {
       writeData(count, parent);
       write(parent.getChild("root"), file, stream);
     }
 
     // read
-    try (
-        BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
-        FileInputStream fileInputStream = new FileInputStream(file);
-        ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator);
-        BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-        MapVector parent = new MapVector("parent", vectorAllocator, null)
-        ) {
-      ArrowFooter footer = arrowReader.readFooter();
-      Schema schema = footer.getSchema();
-      LOGGER.debug("reading schema: " + schema);
-
-      // initialize vectors
-
-      try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) {
-        VectorLoader vectorLoader = new VectorLoader(root);
-
-        List<ArrowBlock> recordBatches = footer.getRecordBatches();
-        for (ArrowBlock rbBlock : recordBatches) {
-          try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
-            List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
-            for (ArrowBuffer arrowBuffer : buffersLayout) {
-              Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         FileInputStream fileInputStream = new FileInputStream(file);
+         ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator){
+            @Override
+            protected ArrowMessage readMessage(SeekableReadChannel in, BufferAllocator allocator) throws IOException {
+              ArrowMessage message = super.readMessage(in, allocator);
+              if (message != null) {
+                ArrowRecordBatch batch = (ArrowRecordBatch) message;
+                List<ArrowBuffer> buffersLayout = batch.getBuffersLayout();
+                for (ArrowBuffer arrowBuffer : buffersLayout) {
+                  Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
+                }
+              }
+              return message;
             }
-            vectorLoader.load(recordBatch);
-          }
-
-          validateContent(count, root);
-        }
+         }) {
+      Schema schema = arrowReader.getVectorSchemaRoot().getSchema();
+      LOGGER.debug("reading schema: " + schema);
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) {
+        arrowReader.loadRecordBatch(rbBlock);
+        Assert.assertEquals(count, root.getRowCount());
+        validateContent(count, root);
       }
     }
 
     // Read from stream.
-    try (
-        BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
-        ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
-        ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator);
-        BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-        MapVector parent = new MapVector("parent", vectorAllocator, null)
-        ) {
-      arrowReader.init();
-      Schema schema = arrowReader.getSchema();
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
+         ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator){
+           @Override
+           protected ArrowMessage readMessage(ReadChannel in, BufferAllocator allocator) throws IOException {
+             ArrowMessage message = super.readMessage(in, allocator);
+             if (message != null) {
+               ArrowRecordBatch batch = (ArrowRecordBatch) message;
+               List<ArrowBuffer> buffersLayout = batch.getBuffersLayout();
+               for (ArrowBuffer arrowBuffer : buffersLayout) {
+                 Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
+               }
+             }
+             return message;
+           }
+         }) {
+
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
       LOGGER.debug("reading schema: " + schema);
-
-      try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) {
-        VectorLoader vectorLoader = new VectorLoader(root);
-        while (true) {
-          try (ArrowRecordBatch recordBatch = arrowReader.nextRecordBatch()) {
-            if (recordBatch == null) break;
-            List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
-            for (ArrowBuffer arrowBuffer : buffersLayout) {
-              Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
-            }
-            vectorLoader.load(recordBatch);
-          }
-        }
-        validateContent(count, root);
-      }
+      arrowReader.loadNextBatch();
+      Assert.assertEquals(count, root.getRowCount());
+      validateContent(count, root);
     }
   }
 
@@ -158,61 +163,37 @@ public class TestArrowFile extends BaseFileTest {
     int count = COUNT;
 
     // write
-    try (
-        BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
-        MapVector parent = new MapVector("parent", originalVectorAllocator, null)) {
+    try (BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
+         MapVector parent = new MapVector("parent", originalVectorAllocator, null)) {
       writeComplexData(count, parent);
       write(parent.getChild("root"), file, stream);
     }
 
     // read
-    try (
-        BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
-        FileInputStream fileInputStream = new FileInputStream(file);
-        ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator);
-        BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-        NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null)
-        ) {
-      ArrowFooter footer = arrowReader.readFooter();
-      Schema schema = footer.getSchema();
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         FileInputStream fileInputStream = new FileInputStream(file);
+         ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
       LOGGER.debug("reading schema: " + schema);
 
-      // initialize vectors
-
-      try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) {
-        VectorLoader vectorLoader = new VectorLoader(root);
-        List<ArrowBlock> recordBatches = footer.getRecordBatches();
-        for (ArrowBlock rbBlock : recordBatches) {
-          try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
-            vectorLoader.load(recordBatch);
-          }
-          validateComplexContent(count, root);
-        }
+      for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) {
+        arrowReader.loadRecordBatch(rbBlock);
+        Assert.assertEquals(count, root.getRowCount());
+        validateComplexContent(count, root);
       }
     }
 
     // Read from stream.
-    try (
-        BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
-        ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
-        ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator);
-        BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-        MapVector parent = new MapVector("parent", vectorAllocator, null)
-        ) {
-      arrowReader.init();
-      Schema schema = arrowReader.getSchema();
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
+         ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
       LOGGER.debug("reading schema: " + schema);
-
-      try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) {
-        VectorLoader vectorLoader = new VectorLoader(root);
-        while (true) {
-          try (ArrowRecordBatch recordBatch = arrowReader.nextRecordBatch()) {
-            if (recordBatch == null) break;
-            vectorLoader.load(recordBatch);
-          }
-        }
-        validateComplexContent(count, root);
-      }
+      arrowReader.loadNextBatch();
+      Assert.assertEquals(count, root.getRowCount());
+      validateComplexContent(count, root);
     }
   }
 
@@ -223,94 +204,70 @@ public class TestArrowFile extends BaseFileTest {
     int[] counts = { 10, 5 };
 
     // write
-    try (
-        BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
-        MapVector parent = new MapVector("parent", originalVectorAllocator, null);
-        FileOutputStream fileOutputStream = new FileOutputStream(file);) {
+    try (BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
+         MapVector parent = new MapVector("parent", originalVectorAllocator, null);
+         FileOutputStream fileOutputStream = new FileOutputStream(file)){
       writeData(counts[0], parent);
-      VectorUnloader vectorUnloader0 = newVectorUnloader(parent.getChild("root"));
-      Schema schema = vectorUnloader0.getSchema();
-      Assert.assertEquals(2, schema.getFields().size());
-      try (ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
-          ArrowStreamWriter streamWriter = new ArrowStreamWriter(stream, schema)) {
-        try (ArrowRecordBatch recordBatch = vectorUnloader0.getRecordBatch()) {
-          Assert.assertEquals("RB #0", counts[0], recordBatch.getLength());
-          arrowWriter.writeRecordBatch(recordBatch);
-          streamWriter.writeRecordBatch(recordBatch);
-        }
+      VectorSchemaRoot root = new VectorSchemaRoot(parent.getChild("root"));
+
+      try(ArrowFileWriter fileWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel());
+          ArrowStreamWriter streamWriter = new ArrowStreamWriter(root, null, stream)) {
+        fileWriter.start();
+        streamWriter.start();
+
+        fileWriter.writeBatch();
+        streamWriter.writeBatch();
+
         parent.allocateNew();
         writeData(counts[1], parent); // if we write the same data we don't catch that the metadata is stored in the wrong order.
-        VectorUnloader vectorUnloader1 = newVectorUnloader(parent.getChild("root"));
-        try (ArrowRecordBatch recordBatch = vectorUnloader1.getRecordBatch()) {
-          Assert.assertEquals("RB #1", counts[1], recordBatch.getLength());
-          arrowWriter.writeRecordBatch(recordBatch);
-          streamWriter.writeRecordBatch(recordBatch);
-        }
+        root.setRowCount(counts[1]);
+
+        fileWriter.writeBatch();
+        streamWriter.writeBatch();
+
+        fileWriter.end();
+        streamWriter.end();
       }
     }
 
-    // read
-    try (
-        BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
-        FileInputStream fileInputStream = new FileInputStream(file);
-        ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator);
-        BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-        MapVector parent = new MapVector("parent", vectorAllocator, null);
-        ) {
-      ArrowFooter footer = arrowReader.readFooter();
-      Schema schema = footer.getSchema();
+    // read file
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         FileInputStream fileInputStream = new FileInputStream(file);
+         ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
       LOGGER.debug("reading schema: " + schema);
       int i = 0;
-      try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator);) {
-        VectorLoader vectorLoader = new VectorLoader(root);
-        List<ArrowBlock> recordBatches = footer.getRecordBatches();
-        Assert.assertEquals(2, recordBatches.size());
-        long previousOffset = 0;
-        for (ArrowBlock rbBlock : recordBatches) {
-          Assert.assertTrue(rbBlock.getOffset() + " > " + previousOffset, rbBlock.getOffset() > previousOffset);
-          previousOffset = rbBlock.getOffset();
-          try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
-            Assert.assertEquals("RB #" + i, counts[i], recordBatch.getLength());
-            List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
-            for (ArrowBuffer arrowBuffer : buffersLayout) {
-              Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
-            }
-            vectorLoader.load(recordBatch);
-            validateContent(counts[i], root);
-          }
-          ++i;
-        }
+      List<ArrowBlock> recordBatches = arrowReader.getRecordBlocks();
+      Assert.assertEquals(2, recordBatches.size());
+      long previousOffset = 0;
+      for (ArrowBlock rbBlock : recordBatches) {
+        Assert.assertTrue(rbBlock.getOffset() + " > " + previousOffset, rbBlock.getOffset() > previousOffset);
+        previousOffset = rbBlock.getOffset();
+        arrowReader.loadRecordBatch(rbBlock);
+        Assert.assertEquals("RB #" + i, counts[i], root.getRowCount());
+        validateContent(counts[i], root);
+        ++i;
       }
     }
 
     // read stream
-    try (
-        BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
-        ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
-        ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator);
-        BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-        MapVector parent = new MapVector("parent", vectorAllocator, null)
-        ) {
-      arrowReader.init();
-      Schema schema = arrowReader.getSchema();
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
+         ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
       LOGGER.debug("reading schema: " + schema);
       int i = 0;
-      try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator);) {
-        VectorLoader vectorLoader = new VectorLoader(root);
-        for (int n = 0; n < 2; n++) {
-          try (ArrowRecordBatch recordBatch = arrowReader.nextRecordBatch()) {
-            assertTrue(recordBatch != null);
-            Assert.assertEquals("RB #" + i, counts[i], recordBatch.getLength());
-            List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
-            for (ArrowBuffer arrowBuffer : buffersLayout) {
-              Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
-            }
-            vectorLoader.load(recordBatch);
-            validateContent(counts[i], root);
-          }
-          ++i;
-        }
+
+      for (int n = 0; n < 2; n++) {
+        arrowReader.loadNextBatch();
+        Assert.assertEquals("RB #" + i, counts[i], root.getRowCount());
+        validateContent(counts[i], root);
+        ++i;
       }
+      arrowReader.loadNextBatch();
+      Assert.assertEquals(0, root.getRowCount());
     }
   }
 
@@ -319,90 +276,326 @@ public class TestArrowFile extends BaseFileTest {
     File file = new File("target/mytest_write_union.arrow");
     ByteArrayOutputStream stream = new ByteArrayOutputStream();
     int count = COUNT;
-    try (
-        BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
-        NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null)) {
 
+    // write
+    try (BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
+         NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null, null)) {
       writeUnionData(count, parent);
-
-      printVectors(parent.getChildrenFromFields());
-
       validateUnionData(count, new VectorSchemaRoot(parent.getChild("root")));
-
       write(parent.getChild("root"), file, stream);
     }
-    // read
-    try (
-        BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
-        FileInputStream fileInputStream = new FileInputStream(file);
-        ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator);
-        BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-        ) {
-      ArrowFooter footer = arrowReader.readFooter();
-      Schema schema = footer.getSchema();
+
+    // read file
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         FileInputStream fileInputStream = new FileInputStream(file);
+         ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
       LOGGER.debug("reading schema: " + schema);
+      arrowReader.loadNextBatch();
+      validateUnionData(count, root);
+    }
+
+    // Read from stream.
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
+         ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
+      LOGGER.debug("reading schema: " + schema);
+      arrowReader.loadNextBatch();
+      validateUnionData(count, root);
+    }
+  }
 
-      // initialize vectors
-      try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator);) {
-        VectorLoader vectorLoader = new VectorLoader(root);
-        List<ArrowBlock> recordBatches = footer.getRecordBatches();
-        for (ArrowBlock rbBlock : recordBatches) {
-          try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
-            vectorLoader.load(recordBatch);
-          }
-          validateUnionData(count, root);
-        }
+  @Test
+  public void testWriteReadTiny() throws IOException {
+    File file = new File("target/mytest_write_tiny.arrow");
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+
+    try (VectorSchemaRoot root = VectorSchemaRoot.create(MessageSerializerTest.testSchema(), allocator)) {
+      root.getFieldVectors().get(0).allocateNew();
+      NullableTinyIntVector.Mutator mutator = (NullableTinyIntVector.Mutator) root.getFieldVectors().get(0).getMutator();
+      for (int i = 0; i < 16; i++) {
+        mutator.set(i, i < 8 ? 1 : 0, (byte)(i + 1));
+      }
+      mutator.setValueCount(16);
+      root.setRowCount(16);
+
+      // write file
+      try (FileOutputStream fileOutputStream = new FileOutputStream(file);
+           ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel())) {
+        LOGGER.debug("writing schema: " + root.getSchema());
+        arrowWriter.start();
+        arrowWriter.writeBatch();
+        arrowWriter.end();
+      }
+      // write stream
+      try (ArrowStreamWriter arrowWriter = new ArrowStreamWriter(root, null, stream)) {
+        arrowWriter.start();
+        arrowWriter.writeBatch();
+        arrowWriter.end();
       }
     }
 
+    // read file
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("fileReader", 0, Integer.MAX_VALUE);
+         FileInputStream fileInputStream = new FileInputStream(file);
+         ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
+      LOGGER.debug("reading schema: " + schema);
+      arrowReader.loadNextBatch();
+      validateTinyData(root);
+    }
+
     // Read from stream.
-    try (
-        BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
-        ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
-        ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator);
-        BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-        MapVector parent = new MapVector("parent", vectorAllocator, null)
-        ) {
-      arrowReader.init();
-      Schema schema = arrowReader.getSchema();
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("streamReader", 0, Integer.MAX_VALUE);
+         ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
+         ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
+      LOGGER.debug("reading schema: " + schema);
+      arrowReader.loadNextBatch();
+      validateTinyData(root);
+    }
+  }
+
+  private void validateTinyData(VectorSchemaRoot root) {
+    Assert.assertEquals(16, root.getRowCount());
+    NullableTinyIntVector vector = (NullableTinyIntVector) root.getFieldVectors().get(0);
+    for (int i = 0; i < 16; i++) {
+      if (i < 8) {
+        Assert.assertEquals((byte)(i + 1), vector.getAccessor().get(i));
+      } else {
+        Assert.assertTrue(vector.getAccessor().isNull(i));
+      }
+    }
+  }
+
+  @Test
+  public void testWriteReadDictionary() throws IOException {
+    File file = new File("target/mytest_dict.arrow");
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+
+    // write
+    try (BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
+         NullableVarCharVector vector = new NullableVarCharVector("varchar", originalVectorAllocator, null);
+         NullableVarCharVector dictionaryVector = new NullableVarCharVector("dict", originalVectorAllocator, null)) {
+      vector.allocateNewSafe();
+      NullableVarCharVector.Mutator mutator = vector.getMutator();
+      mutator.set(0, "foo".getBytes(StandardCharsets.UTF_8));
+      mutator.set(1, "bar".getBytes(StandardCharsets.UTF_8));
+      mutator.set(3, "baz".getBytes(StandardCharsets.UTF_8));
+      mutator.set(4, "bar".getBytes(StandardCharsets.UTF_8));
+      mutator.set(5, "baz".getBytes(StandardCharsets.UTF_8));
+      mutator.setValueCount(6);
+
+      dictionaryVector.allocateNewSafe();
+      mutator = dictionaryVector.getMutator();
+      mutator.set(0, "foo".getBytes(StandardCharsets.UTF_8));
+      mutator.set(1, "bar".getBytes(StandardCharsets.UTF_8));
+      mutator.set(2, "baz".getBytes(StandardCharsets.UTF_8));
+      mutator.setValueCount(3);
+
+      Dictionary dictionary = new Dictionary(dictionaryVector, new DictionaryEncoding(1L, false, null));
+      MapDictionaryProvider provider = new MapDictionaryProvider();
+      provider.put(dictionary);
+
+      FieldVector encodedVector = (FieldVector) DictionaryEncoder.encode(vector, dictionary);
+
+      List<Field> fields = ImmutableList.of(encodedVector.getField());
+      List<FieldVector> vectors = ImmutableList.of(encodedVector);
+      VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 6);
+
+      try (FileOutputStream fileOutputStream = new FileOutputStream(file);
+           ArrowFileWriter fileWriter = new ArrowFileWriter(root, provider, fileOutputStream.getChannel());
+           ArrowStreamWriter streamWriter = new ArrowStreamWriter(root, provider, stream)) {
+        LOGGER.debug("writing schema: " + root.getSchema());
+        fileWriter.start();
+        streamWriter.start();
+        fileWriter.writeBatch();
+        streamWriter.writeBatch();
+        fileWriter.end();
+        streamWriter.end();
+      }
+
+      dictionaryVector.close();
+      encodedVector.close();
+    }
+
+    // read from file
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         FileInputStream fileInputStream = new FileInputStream(file);
+         ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
       LOGGER.debug("reading schema: " + schema);
+      arrowReader.loadNextBatch();
+      validateFlatDictionary(root.getFieldVectors().get(0), arrowReader);
+    }
+
+    // Read from stream
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
+         ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
+      LOGGER.debug("reading schema: " + schema);
+      arrowReader.loadNextBatch();
+      validateFlatDictionary(root.getFieldVectors().get(0), arrowReader);
+    }
+  }
+
+  private void validateFlatDictionary(FieldVector vector, DictionaryProvider provider) {
+    Assert.assertNotNull(vector);
+
+    DictionaryEncoding encoding = vector.getField().getDictionary();
+    Assert.assertNotNull(encoding);
+    Assert.assertEquals(1L, encoding.getId());
+
+    FieldVector.Accessor accessor = vector.getAccessor();
+    Assert.assertEquals(6, accessor.getValueCount());
+    Assert.assertEquals(0, accessor.getObject(0));
+    Assert.assertEquals(1, accessor.getObject(1));
+    Assert.assertEquals(null, accessor.getObject(2));
+    Assert.assertEquals(2, accessor.getObject(3));
+    Assert.assertEquals(1, accessor.getObject(4));
+    Assert.assertEquals(2, accessor.getObject(5));
+
+    Dictionary dictionary = provider.lookup(1L);
+    Assert.assertNotNull(dictionary);
+    NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) dictionary.getVector()).getAccessor();
+    Assert.assertEquals(3, dictionaryAccessor.getValueCount());
+    Assert.assertEquals(new Text("foo"), dictionaryAccessor.getObject(0));
+    Assert.assertEquals(new Text("bar"), dictionaryAccessor.getObject(1));
+    Assert.assertEquals(new Text("baz"), dictionaryAccessor.getObject(2));
+  }
 
-      try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) {
-        VectorLoader vectorLoader = new VectorLoader(root);
-        while (true) {
-          try (ArrowRecordBatch recordBatch = arrowReader.nextRecordBatch()) {
-            if (recordBatch == null) break;
-            vectorLoader.load(recordBatch);
-          }
-        }
-        validateUnionData(count, root);
+  @Test
+  public void testWriteReadNestedDictionary() throws IOException {
+    File file = new File("target/mytest_dict_nested.arrow");
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+
+    DictionaryEncoding encoding = new DictionaryEncoding(2L, false, null);
+
+    // data being written:
+    // [['foo', 'bar'], ['foo'], ['bar']] -> [[0, 1], [0], [1]]
+
+    // write
+    try (NullableVarCharVector dictionaryVector = new NullableVarCharVector("dictionary", allocator, null);
+         ListVector listVector = new ListVector("list", allocator, null, null)) {
+
+      Dictionary dictionary = new Dictionary(dictionaryVector, encoding);
+      MapDictionaryProvider provider = new MapDictionaryProvider();
+      provider.put(dictionary);
+
+      dictionaryVector.allocateNew();
+      dictionaryVector.getMutator().set(0, "foo".getBytes(StandardCharsets.UTF_8));
+      dictionaryVector.getMutator().set(1, "bar".getBytes(StandardCharsets.UTF_8));
+      dictionaryVector.getMutator().setValueCount(2);
+
+      listVector.addOrGetVector(MinorType.INT, encoding);
+      listVector.allocateNew();
+      UnionListWriter listWriter = new UnionListWriter(listVector);
+      listWriter.startList();
+      listWriter.writeInt(0);
+      listWriter.writeInt(1);
+      listWriter.endList();
+      listWriter.startList();
+      listWriter.writeInt(0);
+      listWriter.endList();
+      listWriter.startList();
+      listWriter.writeInt(1);
+      listWriter.endList();
+      listWriter.setValueCount(3);
+
+      List<Field> fields = ImmutableList.of(listVector.getField());
+      List<FieldVector> vectors = ImmutableList.of((FieldVector) listVector);
+      VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 3);
+
+      try (
+           FileOutputStream fileOutputStream = new FileOutputStream(file);
+           ArrowFileWriter fileWriter = new ArrowFileWriter(root, provider, fileOutputStream.getChannel());
+           ArrowStreamWriter streamWriter = new ArrowStreamWriter(root, provider, stream)) {
+        LOGGER.debug("writing schema: " + root.getSchema());
+        fileWriter.start();
+        streamWriter.start();
+        fileWriter.writeBatch();
+        streamWriter.writeBatch();
+        fileWriter.end();
+        streamWriter.end();
       }
     }
+
+    // read from file
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         FileInputStream fileInputStream = new FileInputStream(file);
+         ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
+      LOGGER.debug("reading schema: " + schema);
+      arrowReader.loadNextBatch();
+      validateNestedDictionary((ListVector) root.getFieldVectors().get(0), arrowReader);
+    }
+
+    // Read from stream
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
+         ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
+      LOGGER.debug("reading schema: " + schema);
+      arrowReader.loadNextBatch();
+      validateNestedDictionary((ListVector) root.getFieldVectors().get(0), arrowReader);
+    }
+  }
+
+  private void validateNestedDictionary(ListVector vector, DictionaryProvider provider) {
+    Assert.assertNotNull(vector);
+    Assert.assertNull(vector.getField().getDictionary());
+    Field nestedField = vector.getField().getChildren().get(0);
+
+    DictionaryEncoding encoding = nestedField.getDictionary();
+    Assert.assertNotNull(encoding);
+    Assert.assertEquals(2L, encoding.getId());
+    Assert.assertEquals(new Int(32, true), encoding.getIndexType());
+
+    ListVector.Accessor accessor = vector.getAccessor();
+    Assert.assertEquals(3, accessor.getValueCount());
+    Assert.assertEquals(Arrays.asList(0, 1), accessor.getObject(0));
+    Assert.assertEquals(Arrays.asList(0), accessor.getObject(1));
+    Assert.assertEquals(Arrays.asList(1), accessor.getObject(2));
+
+    Dictionary dictionary = provider.lookup(2L);
+    Assert.assertNotNull(dictionary);
+    NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) dictionary.getVector()).getAccessor();
+    Assert.assertEquals(2, dictionaryAccessor.getValueCount());
+    Assert.assertEquals(new Text("foo"), dictionaryAccessor.getObject(0));
+    Assert.assertEquals(new Text("bar"), dictionaryAccessor.getObject(1));
   }
 
   /**
    * Writes the contents of parents to file. If outStream is non-null, also writes it
    * to outStream in the streaming serialized format.
    */
-  private void write(FieldVector parent, File file, OutputStream outStream) throws FileNotFoundException, IOException {
-    VectorUnloader vectorUnloader = newVectorUnloader(parent);
-    Schema schema = vectorUnloader.getSchema();
-    LOGGER.debug("writing schema: " + schema);
-    try (
-        FileOutputStream fileOutputStream = new FileOutputStream(file);
-        ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
-        ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
-            ) {
-      arrowWriter.writeRecordBatch(recordBatch);
+  private void write(FieldVector parent, File file, OutputStream outStream) throws IOException {
+    VectorSchemaRoot root = new VectorSchemaRoot(parent);
+
+    try (FileOutputStream fileOutputStream = new FileOutputStream(file);
+         ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel());) {
+      LOGGER.debug("writing schema: " + root.getSchema());
+      arrowWriter.start();
+      arrowWriter.writeBatch();
+      arrowWriter.end();
     }
 
     // Also try serializing to the stream writer.
     if (outStream != null) {
-      try (
-          ArrowStreamWriter arrowWriter = new ArrowStreamWriter(outStream, schema);
-          ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
-          ) {
-        arrowWriter.writeRecordBatch(recordBatch);
+      try (ArrowStreamWriter arrowWriter = new ArrowStreamWriter(root, null, outStream)) {
+        arrowWriter.start();
+        arrowWriter.writeBatch();
+        arrowWriter.end();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
index 13b04de..914dfe4 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
@@ -17,12 +17,15 @@
  */
 package org.apache.arrow.vector.file;
 
+import static java.nio.channels.Channels.newChannel;
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
@@ -34,8 +37,14 @@ import org.apache.arrow.flatbuf.Message;
 import org.apache.arrow.flatbuf.RecordBatch;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.NullableIntVector;
+import org.apache.arrow.vector.NullableTinyIntVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.schema.ArrowFieldNode;
 import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
@@ -69,12 +78,17 @@ public class TestArrowReaderWriter {
   @Test
   public void test() throws IOException {
     Schema schema = new Schema(asList(new Field("testField", true, new ArrowType.Int(8, true), Collections.<Field>emptyList())));
-    byte[] validity = new byte[] { (byte)255, 0};
+    MinorType minorType = Types.getMinorTypeForArrowType(schema.getFields().get(0).getType());
+    FieldVector vector = minorType.getNewVector("testField", allocator, null,null);
+    vector.initializeChildrenFromFields(schema.getFields().get(0).getChildren());
+
+    byte[] validity = new byte[] { (byte) 255, 0};
     // second half is "undefined"
     byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
 
     ByteArrayOutputStream out = new ByteArrayOutputStream();
-    try (ArrowWriter writer = new ArrowWriter(Channels.newChannel(out), schema)) {
+    try (VectorSchemaRoot root = new VectorSchemaRoot(schema.getFields(), asList(vector), 16);
+         ArrowFileWriter writer = new ArrowFileWriter(root, null, newChannel(out))) {
       ArrowBuf validityb = buf(validity);
       ArrowBuf valuesb =  buf(values);
       writer.writeRecordBatch(new ArrowRecordBatch(16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb)));
@@ -82,15 +96,15 @@ public class TestArrowReaderWriter {
 
     byte[] byteArray = out.toByteArray();
 
-    try (ArrowReader reader = new ArrowReader(new ByteArrayReadableSeekableByteChannel(byteArray), allocator)) {
-      ArrowFooter footer = reader.readFooter();
-      Schema readSchema = footer.getSchema();
+    SeekableReadChannel channel = new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel(byteArray));
+    try (ArrowFileReader reader = new ArrowFileReader(channel, allocator)) {
+      Schema readSchema = reader.getVectorSchemaRoot().getSchema();
       assertEquals(schema, readSchema);
       assertTrue(readSchema.getFields().get(0).getTypeLayout().getVectorTypes().toString(), readSchema.getFields().get(0).getTypeLayout().getVectors().size() > 0);
       // TODO: dictionaries
-      List<ArrowBlock> recordBatches = footer.getRecordBatches();
+      List<ArrowBlock> recordBatches = reader.getRecordBlocks();
       assertEquals(1, recordBatches.size());
-      ArrowRecordBatch recordBatch = reader.readRecordBatch(recordBatches.get(0));
+      ArrowRecordBatch recordBatch = (ArrowRecordBatch) reader.readMessage(channel, allocator);
       List<ArrowFieldNode> nodes = recordBatch.getNodes();
       assertEquals(1, nodes.size());
       ArrowFieldNode node = nodes.get(0);