You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/03/16 18:00:03 UTC

[4/4] arrow git commit: ARROW-542: Adding dictionary encoding to FileWriter

ARROW-542: Adding dictionary encoding to FileWriter

WIP for comments

Author: Emilio Lahr-Vivaz <el...@ccri.com>
Author: Wes McKinney <we...@twosigma.com>

Closes #334 from elahrvivaz/ARROW-542 and squashes the following commits:

5339730 [Emilio Lahr-Vivaz] fixing bitvector load of value count, adding struct integration test
00d78d3 [Emilio Lahr-Vivaz] fixing set bit validity value in NullableMapVector load
1679934 [Emilio Lahr-Vivaz] cleaning up license
70639e0 [Emilio Lahr-Vivaz] restoring vector loader test
bde4eee [Wes McKinney] Handle 0-length message indicator for EOS in C++ StreamReader
a24854b [Emilio Lahr-Vivaz] fixing StreamToFile conversion
2ee7cfb [Emilio Lahr-Vivaz] fixing FileToStream conversion
adec200 [Emilio Lahr-Vivaz] making arrow magic static, cleanup
8366288 [Emilio Lahr-Vivaz] making magic array private
127937f [Emilio Lahr-Vivaz] removing qualifier for magic
db9a007 [Emilio Lahr-Vivaz] adding dictionary tests to echo server
95c7b2a [Emilio Lahr-Vivaz] cleanup
45caa02 [Emilio Lahr-Vivaz] reverting basewriter dictionary methods
682db6f [Emilio Lahr-Vivaz] cleanup
a1508b9 [Emilio Lahr-Vivaz] removing dictionary vector method (instead use field.dictionary)
43c28af [Emilio Lahr-Vivaz] adding test for nested dictionary encoded list
92a1e6f [Emilio Lahr-Vivaz] fixing imports
e567564 [Emilio Lahr-Vivaz] adding field size check in vectorschemaroot
568fda5 [Emilio Lahr-Vivaz] imports, formatting
363308e [Emilio Lahr-Vivaz] fixing tests
2f69be1 [Emilio Lahr-Vivaz] not passing around dictionary vectors with dictionary fields, adding dictionary encoding to fields, restoring vector loader/unloader
e5c8e02 [Emilio Lahr-Vivaz] Merging dictionary unloader/loader with arrow writer/reader Creating base class for stream/file writer Creating base class with visitors for arrow messages Indentation fixes Other cleanup
d095f3f [Emilio Lahr-Vivaz] ARROW-542: Adding dictionary encoding to file and stream writing


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/49f666e7
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/49f666e7
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/49f666e7

Branch: refs/heads/master
Commit: 49f666e740208d1e6167537f141f27b6b78b77cb
Parents: 3b65001
Author: Emilio Lahr-Vivaz <el...@ccri.com>
Authored: Thu Mar 16 13:59:53 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Thu Mar 16 13:59:53 2017 -0400

----------------------------------------------------------------------
 cpp/src/arrow/ipc/reader.cc                     |   6 +
 integration/integration_test.py                 |   4 +
 .../java/org/apache/arrow/tools/EchoServer.java |  48 +-
 .../org/apache/arrow/tools/FileRoundtrip.java   |  48 +-
 .../org/apache/arrow/tools/FileToStream.java    |  27 +-
 .../org/apache/arrow/tools/Integration.java     |  83 +--
 .../org/apache/arrow/tools/StreamToFile.java    |  19 +-
 .../arrow/tools/ArrowFileTestFixtures.java      |  51 +-
 .../org/apache/arrow/tools/EchoServerTest.java  | 280 ++++++--
 .../org/apache/arrow/tools/TestIntegration.java |  38 +-
 java/tools/tmptestfilesio                       | Bin 0 -> 628 bytes
 .../src/main/codegen/templates/MapWriters.java  |   8 +-
 .../codegen/templates/NullableValueVectors.java |  40 +-
 .../src/main/codegen/templates/UnionVector.java |  10 +-
 .../java/org/apache/arrow/vector/BitVector.java |   2 +-
 .../org/apache/arrow/vector/FieldVector.java    |   4 +-
 .../org/apache/arrow/vector/VectorLoader.java   |  13 +-
 .../apache/arrow/vector/VectorSchemaRoot.java   |  32 +-
 .../org/apache/arrow/vector/VectorUnloader.java |  27 +-
 .../vector/complex/AbstractContainerVector.java |   3 +-
 .../arrow/vector/complex/AbstractMapVector.java |   9 +-
 .../vector/complex/BaseRepeatedValueVector.java |   5 +-
 .../arrow/vector/complex/DictionaryVector.java  | 229 -------
 .../apache/arrow/vector/complex/ListVector.java |  26 +-
 .../apache/arrow/vector/complex/MapVector.java  |   5 +-
 .../arrow/vector/complex/NullableMapVector.java |   9 +-
 .../vector/complex/impl/ComplexWriterImpl.java  |   6 +-
 .../vector/complex/impl/PromotableWriter.java   |   5 +-
 .../arrow/vector/dictionary/Dictionary.java     |  66 ++
 .../vector/dictionary/DictionaryEncoder.java    | 144 ++++
 .../vector/dictionary/DictionaryProvider.java   |  47 ++
 .../arrow/vector/file/ArrowFileReader.java      | 142 ++++
 .../arrow/vector/file/ArrowFileWriter.java      |  59 ++
 .../apache/arrow/vector/file/ArrowFooter.java   |   1 -
 .../apache/arrow/vector/file/ArrowMagic.java    |  37 ++
 .../apache/arrow/vector/file/ArrowReader.java   | 222 +++++--
 .../apache/arrow/vector/file/ArrowWriter.java   | 173 +++--
 .../apache/arrow/vector/file/ReadChannel.java   |  11 +-
 .../arrow/vector/file/SeekableReadChannel.java  |  39 ++
 .../apache/arrow/vector/file/WriteChannel.java  |   7 +-
 .../arrow/vector/file/json/JsonFileReader.java  |  26 +-
 .../vector/schema/ArrowDictionaryBatch.java     |  60 ++
 .../arrow/vector/schema/ArrowMessage.java       |  30 +
 .../arrow/vector/schema/ArrowRecordBatch.java   |   8 +-
 .../arrow/vector/stream/ArrowStreamReader.java  |  88 +--
 .../arrow/vector/stream/ArrowStreamWriter.java  |  75 +--
 .../arrow/vector/stream/MessageSerializer.java  | 164 ++++-
 .../apache/arrow/vector/types/Dictionary.java   |  40 --
 .../org/apache/arrow/vector/types/Types.java    | 114 ++--
 .../vector/types/pojo/DictionaryEncoding.java   |  51 ++
 .../apache/arrow/vector/types/pojo/Field.java   |  59 +-
 .../apache/arrow/vector/TestDecimalVector.java  |   2 +-
 .../arrow/vector/TestDictionaryVector.java      |  82 +--
 .../org/apache/arrow/vector/TestListVector.java |   4 +-
 .../apache/arrow/vector/TestValueVector.java    |  12 +-
 .../arrow/vector/TestVectorUnloadLoad.java      |  22 +-
 .../complex/impl/TestPromotableWriter.java      |   2 +-
 .../complex/writer/TestComplexWriter.java       |  14 +-
 .../apache/arrow/vector/file/TestArrowFile.java | 665 ++++++++++++-------
 .../vector/file/TestArrowReaderWriter.java      |  28 +-
 .../arrow/vector/file/TestArrowStream.java      | 102 +++
 .../arrow/vector/file/TestArrowStreamPipe.java  | 163 +++++
 .../arrow/vector/file/json/TestJSONFile.java    |   4 +-
 .../vector/stream/MessageSerializerTest.java    |   8 +-
 .../arrow/vector/stream/TestArrowStream.java    |  96 ---
 .../vector/stream/TestArrowStreamPipe.java      | 129 ----
 66 files changed, 2522 insertions(+), 1511 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/cpp/src/arrow/ipc/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 9734166..4cb5f6c 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -78,6 +78,12 @@ class StreamReader::StreamReaderImpl {
 
     int32_t message_length = *reinterpret_cast<const int32_t*>(buffer->data());
 
+    if (message_length == 0) {
+      // Optional 0 EOS control message
+      *message = nullptr;
+      return Status::OK();
+    }
+
     RETURN_NOT_OK(stream_->Read(message_length, &buffer));
     if (buffer->size() != message_length) {
       return Status::IOError("Unexpected end of stream trying to read message");

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/integration/integration_test.py
----------------------------------------------------------------------
diff --git a/integration/integration_test.py b/integration/integration_test.py
index 049436a..5cd63c5 100644
--- a/integration/integration_test.py
+++ b/integration/integration_test.py
@@ -680,12 +680,16 @@ class JavaTester(Tester):
         cmd = ['java', '-cp', self.ARROW_TOOLS_JAR,
                'org.apache.arrow.tools.StreamToFile',
                stream_path, file_path]
+        if self.debug:
+            print(' '.join(cmd))
         run_cmd(cmd)
 
     def file_to_stream(self, file_path, stream_path):
         cmd = ['java', '-cp', self.ARROW_TOOLS_JAR,
                'org.apache.arrow.tools.FileToStream',
                file_path, stream_path]
+        if self.debug:
+            print(' '.join(cmd))
         run_cmd(cmd)
 
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
index c00620e..7c0cadd 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
@@ -18,23 +18,19 @@
 package org.apache.arrow.tools;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.ServerSocket;
 import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
+
+import com.google.common.base.Preconditions;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.stream.ArrowStreamReader;
 import org.apache.arrow.vector.stream.ArrowStreamWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 public class EchoServer {
   private static final Logger LOGGER = LoggerFactory.getLogger(EchoServer.class);
 
@@ -57,30 +53,28 @@ public class EchoServer {
 
     public void run() throws IOException {
       BufferAllocator  allocator = new RootAllocator(Long.MAX_VALUE);
-      List<ArrowRecordBatch> batches = new ArrayList<ArrowRecordBatch>();
-      try (
-        InputStream in = socket.getInputStream();
-        OutputStream out = socket.getOutputStream();
-        ArrowStreamReader reader = new ArrowStreamReader(in, allocator);
-      ) {
-        // Read the entire input stream.
-        reader.init();
-        while (true) {
-          ArrowRecordBatch batch = reader.nextRecordBatch();
-          if (batch == null) break;
-          batches.add(batch);
-        }
-        LOGGER.info(String.format("Received %d batches", batches.size()));
-
-        // Write it back
-        try (ArrowStreamWriter writer = new ArrowStreamWriter(out, reader.getSchema())) {
-          for (ArrowRecordBatch batch: batches) {
-            writer.writeRecordBatch(batch);
+      // Read the entire input stream and write it back
+      try (ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) {
+        VectorSchemaRoot root = reader.getVectorSchemaRoot();
+        // load the first batch before instantiating the writer so that we have any dictionaries
+        reader.loadNextBatch();
+        try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, socket.getOutputStream())) {
+          writer.start();
+          int echoed = 0;
+          while (true) {
+            int rowCount = reader.getVectorSchemaRoot().getRowCount();
+            if (rowCount == 0) {
+              break;
+            } else {
+              writer.writeBatch();
+              echoed += rowCount;
+              reader.loadNextBatch();
+            }
           }
           writer.end();
           Preconditions.checkState(reader.bytesRead() == writer.bytesWritten());
+          LOGGER.info(String.format("Echoed %d records", echoed));
         }
-        LOGGER.info("Done writing stream back.");
       }
     }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
index db7a1c2..9fa7b76 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
@@ -23,18 +23,12 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
-import java.util.List;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.VectorLoader;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.VectorUnloader;
-import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ArrowFooter;
-import org.apache.arrow.vector.file.ArrowReader;
-import org.apache.arrow.vector.file.ArrowWriter;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.file.ArrowFileReader;
+import org.apache.arrow.vector.file.ArrowFileWriter;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -86,35 +80,27 @@ public class FileRoundtrip {
       File inFile = validateFile("input", inFileName);
       File outFile = validateFile("output", outFileName);
       BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); // TODO: close
-      try(
-          FileInputStream fileInputStream = new FileInputStream(inFile);
-          ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);) {
+      try (FileInputStream fileInputStream = new FileInputStream(inFile);
+           ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) {
 
-        ArrowFooter footer = arrowReader.readFooter();
-        Schema schema = footer.getSchema();
+        VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+        Schema schema = root.getSchema();
         LOGGER.debug("Input file size: " + inFile.length());
         LOGGER.debug("Found schema: " + schema);
 
-        try (
-            FileOutputStream fileOutputStream = new FileOutputStream(outFile);
-            ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
-            ) {
-
-          // initialize vectors
-
-          List<ArrowBlock> recordBatches = footer.getRecordBatches();
-          for (ArrowBlock rbBlock : recordBatches) {
-            try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
-                VectorSchemaRoot root = new VectorSchemaRoot(schema, allocator);) {
-
-              VectorLoader vectorLoader = new VectorLoader(root);
-              vectorLoader.load(inRecordBatch);
-
-              VectorUnloader vectorUnloader = new VectorUnloader(root);
-              ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
-              arrowWriter.writeRecordBatch(recordBatch);
+        try (FileOutputStream fileOutputStream = new FileOutputStream(outFile);
+             ArrowFileWriter arrowWriter = new ArrowFileWriter(root, arrowReader, fileOutputStream.getChannel())) {
+          arrowWriter.start();
+          while (true) {
+            arrowReader.loadNextBatch();
+            int loaded = root.getRowCount();
+            if (loaded == 0) {
+              break;
+            } else {
+              arrowWriter.writeBatch();
             }
           }
+          arrowWriter.end();
         }
         LOGGER.debug("Output file size: " + outFile.length());
       }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
index ba6505c..d534553 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
@@ -25,10 +25,8 @@ import java.io.OutputStream;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ArrowFooter;
-import org.apache.arrow.vector.file.ArrowReader;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.file.ArrowFileReader;
 import org.apache.arrow.vector.stream.ArrowStreamWriter;
 
 /**
@@ -36,19 +34,20 @@ import org.apache.arrow.vector.stream.ArrowStreamWriter;
  * first argument and the output is written to standard out.
  */
 public class FileToStream {
+
   public static void convert(FileInputStream in, OutputStream out) throws IOException {
     BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
-    try(
-        ArrowReader reader = new ArrowReader(in.getChannel(), allocator);) {
-      ArrowFooter footer = reader.readFooter();
-      try (
-        ArrowStreamWriter writer = new ArrowStreamWriter(out, footer.getSchema());
-      ) {
-        for (ArrowBlock block: footer.getRecordBatches()) {
-          try (ArrowRecordBatch batch = reader.readRecordBatch(block)) {
-            writer.writeRecordBatch(batch);
-          }
+    try (ArrowFileReader reader = new ArrowFileReader(in.getChannel(), allocator)) {
+      VectorSchemaRoot root = reader.getVectorSchemaRoot();
+      // load the first batch before instantiating the writer so that we have any dictionaries
+      reader.loadNextBatch();
+      try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, out)) {
+        writer.start();
+        while (root.getRowCount() > 0) {
+          writer.writeBatch();
+          reader.loadNextBatch();
         }
+        writer.end();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
index 36d4ee5..5d4849c 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
@@ -28,16 +28,12 @@ import java.util.List;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.VectorLoader;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.VectorUnloader;
 import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ArrowFooter;
-import org.apache.arrow.vector.file.ArrowReader;
-import org.apache.arrow.vector.file.ArrowWriter;
+import org.apache.arrow.vector.file.ArrowFileReader;
+import org.apache.arrow.vector.file.ArrowFileWriter;
 import org.apache.arrow.vector.file.json.JsonFileReader;
 import org.apache.arrow.vector.file.json.JsonFileWriter;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.arrow.vector.util.Validator;
 import org.apache.commons.cli.CommandLine;
@@ -69,24 +65,18 @@ public class Integration {
     ARROW_TO_JSON(true, false) {
       @Override
       public void execute(File arrowFile, File jsonFile) throws IOException {
-        try(
-            BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+        try(BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
             FileInputStream fileInputStream = new FileInputStream(arrowFile);
-            ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);) {
-          ArrowFooter footer = arrowReader.readFooter();
-          Schema schema = footer.getSchema();
+            ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) {
+          VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+          Schema schema = root.getSchema();
           LOGGER.debug("Input file size: " + arrowFile.length());
           LOGGER.debug("Found schema: " + schema);
-          try (JsonFileWriter writer = new JsonFileWriter(jsonFile, JsonFileWriter.config().pretty(true));) {
+          try (JsonFileWriter writer = new JsonFileWriter(jsonFile, JsonFileWriter.config().pretty(true))) {
             writer.start(schema);
-            List<ArrowBlock> recordBatches = footer.getRecordBatches();
-            for (ArrowBlock rbBlock : recordBatches) {
-              try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
-                  VectorSchemaRoot root = new VectorSchemaRoot(schema, allocator);) {
-                VectorLoader vectorLoader = new VectorLoader(root);
-                vectorLoader.load(inRecordBatch);
-                writer.write(root);
-              }
+            for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) {
+              arrowReader.loadRecordBatch(rbBlock);
+              writer.write(root);
             }
           }
           LOGGER.debug("Output file size: " + jsonFile.length());
@@ -96,27 +86,22 @@ public class Integration {
     JSON_TO_ARROW(false, true) {
       @Override
       public void execute(File arrowFile, File jsonFile) throws IOException {
-        try (
-            BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
-            JsonFileReader reader = new JsonFileReader(jsonFile, allocator);
-            ) {
+        try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+             JsonFileReader reader = new JsonFileReader(jsonFile, allocator)) {
           Schema schema = reader.start();
           LOGGER.debug("Input file size: " + jsonFile.length());
           LOGGER.debug("Found schema: " + schema);
-          try (
-              FileOutputStream fileOutputStream = new FileOutputStream(arrowFile);
-              ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
-              ) {
-
-            // initialize vectors
-            VectorSchemaRoot root;
-            while ((root = reader.read()) != null) {
-              VectorUnloader vectorUnloader = new VectorUnloader(root);
-              try (ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();) {
-                arrowWriter.writeRecordBatch(recordBatch);
-              }
-              root.close();
+          try (FileOutputStream fileOutputStream = new FileOutputStream(arrowFile);
+               VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
+               // TODO json dictionaries
+               ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel())) {
+            arrowWriter.start();
+            reader.read(root);
+            while (root.getRowCount() != 0) {
+              arrowWriter.writeBatch();
+              reader.read(root);
             }
+            arrowWriter.end();
           }
           LOGGER.debug("Output file size: " + arrowFile.length());
         }
@@ -125,32 +110,26 @@ public class Integration {
     VALIDATE(true, true) {
       @Override
       public void execute(File arrowFile, File jsonFile) throws IOException {
-        try (
-            BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
-            JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator);
-            FileInputStream fileInputStream = new FileInputStream(arrowFile);
-            ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);
-            ) {
+        try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+             JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator);
+             FileInputStream fileInputStream = new FileInputStream(arrowFile);
+             ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) {
           Schema jsonSchema = jsonReader.start();
-          ArrowFooter footer = arrowReader.readFooter();
-          Schema arrowSchema = footer.getSchema();
+          VectorSchemaRoot arrowRoot = arrowReader.getVectorSchemaRoot();
+          Schema arrowSchema = arrowRoot.getSchema();
           LOGGER.debug("Arrow Input file size: " + arrowFile.length());
           LOGGER.debug("ARROW schema: " + arrowSchema);
           LOGGER.debug("JSON Input file size: " + jsonFile.length());
           LOGGER.debug("JSON schema: " + jsonSchema);
           Validator.compareSchemas(jsonSchema, arrowSchema);
 
-          List<ArrowBlock> recordBatches = footer.getRecordBatches();
+          List<ArrowBlock> recordBatches = arrowReader.getRecordBlocks();
           Iterator<ArrowBlock> iterator = recordBatches.iterator();
           VectorSchemaRoot jsonRoot;
           while ((jsonRoot = jsonReader.read()) != null && iterator.hasNext()) {
             ArrowBlock rbBlock = iterator.next();
-            try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
-                VectorSchemaRoot arrowRoot = new VectorSchemaRoot(arrowSchema, allocator);) {
-              VectorLoader vectorLoader = new VectorLoader(arrowRoot);
-              vectorLoader.load(inRecordBatch);
-              Validator.compareVectorSchemaRoot(arrowRoot, jsonRoot);
-            }
+            arrowReader.loadRecordBatch(rbBlock);
+            Validator.compareVectorSchemaRoot(arrowRoot, jsonRoot);
             jsonRoot.close();
           }
           boolean hasMoreJSON = jsonRoot != null;

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
index c8a5c89..3b79d5b 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
@@ -27,8 +27,8 @@ import java.nio.channels.Channels;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.file.ArrowWriter;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.file.ArrowFileWriter;
 import org.apache.arrow.vector.stream.ArrowStreamReader;
 
 /**
@@ -38,13 +38,16 @@ public class StreamToFile {
   public static void convert(InputStream in, OutputStream out) throws IOException {
     BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
     try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) {
-      reader.init();
-      try (ArrowWriter writer = new ArrowWriter(Channels.newChannel(out), reader.getSchema());) {
-        while (true) {
-          ArrowRecordBatch batch = reader.nextRecordBatch();
-          if (batch == null) break;
-          writer.writeRecordBatch(batch);
+      VectorSchemaRoot root = reader.getVectorSchemaRoot();
+      // load the first batch before instantiating the writer so that we have any dictionaries
+      reader.loadNextBatch();
+      try (ArrowFileWriter writer = new ArrowFileWriter(root, reader, Channels.newChannel(out))) {
+        writer.start();
+        while (root.getRowCount() > 0) {
+          writer.writeBatch();
+          reader.loadNextBatch();
         }
+        writer.end();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
----------------------------------------------------------------------
diff --git a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
index 4cfc52f..f752f7e 100644
--- a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
+++ b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
@@ -23,13 +23,10 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.util.List;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.VectorLoader;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.VectorUnloader;
 import org.apache.arrow.vector.complex.MapVector;
 import org.apache.arrow.vector.complex.impl.ComplexWriterImpl;
 import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
@@ -37,10 +34,8 @@ import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
 import org.apache.arrow.vector.complex.writer.BigIntWriter;
 import org.apache.arrow.vector.complex.writer.IntWriter;
 import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ArrowFooter;
-import org.apache.arrow.vector.file.ArrowReader;
-import org.apache.arrow.vector.file.ArrowWriter;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.file.ArrowFileReader;
+import org.apache.arrow.vector.file.ArrowFileWriter;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.junit.Assert;
 
@@ -63,26 +58,14 @@ public class ArrowFileTestFixtures {
 
   static void validateOutput(File testOutFile, BufferAllocator allocator) throws Exception {
     // read
-    try (
-        BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
-        FileInputStream fileInputStream = new FileInputStream(testOutFile);
-        ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator);
-        BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
-        ) {
-      ArrowFooter footer = arrowReader.readFooter();
-      Schema schema = footer.getSchema();
-
-      // initialize vectors
-      try (VectorSchemaRoot root = new VectorSchemaRoot(schema, readerAllocator)) {
-        VectorLoader vectorLoader = new VectorLoader(root);
-
-        List<ArrowBlock> recordBatches = footer.getRecordBatches();
-        for (ArrowBlock rbBlock : recordBatches) {
-          try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
-            vectorLoader.load(recordBatch);
-          }
-          validateContent(COUNT, root);
-        }
+    try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+         FileInputStream fileInputStream = new FileInputStream(testOutFile);
+         ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) {
+      VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      Schema schema = root.getSchema();
+      for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) {
+        arrowReader.loadRecordBatch(rbBlock);
+        validateContent(COUNT, root);
       }
     }
   }
@@ -96,16 +79,10 @@ public class ArrowFileTestFixtures {
   }
 
   static void write(FieldVector parent, File file) throws FileNotFoundException, IOException {
-    Schema schema = new Schema(parent.getField().getChildren());
-    int valueCount = parent.getAccessor().getValueCount();
-    List<FieldVector> fields = parent.getChildrenFromFields();
-    VectorUnloader vectorUnloader = new VectorUnloader(schema, valueCount, fields);
-    try (
-        FileOutputStream fileOutputStream = new FileOutputStream(file);
-        ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
-        ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
-            ) {
-      arrowWriter.writeRecordBatch(recordBatch);
+    VectorSchemaRoot root = new VectorSchemaRoot(parent);
+    try (FileOutputStream fileOutputStream = new FileOutputStream(file);
+         ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel())) {
+      arrowWriter.writeBatch();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
----------------------------------------------------------------------
diff --git a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
index 48d6162..706f8e2 100644
--- a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
+++ b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
@@ -24,106 +24,268 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
+
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.NullableIntVector;
+import org.apache.arrow.vector.NullableTinyIntVector;
+import org.apache.arrow.vector.NullableVarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.impl.UnionListWriter;
+import org.apache.arrow.vector.dictionary.Dictionary;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider;
 import org.apache.arrow.vector.stream.ArrowStreamReader;
 import org.apache.arrow.vector.stream.ArrowStreamWriter;
+import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.ArrowType.Int;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.Text;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
-import io.netty.buffer.ArrowBuf;
-
 public class EchoServerTest {
-  public static ArrowBuf buf(BufferAllocator alloc, byte[] bytes) {
-    ArrowBuf buffer = alloc.buffer(bytes.length);
-    buffer.writeBytes(bytes);
-    return buffer;
+
+  private static EchoServer server;
+  private static int serverPort;
+  private static Thread serverThread;
+
+  @BeforeClass
+  public static void startEchoServer() throws IOException {
+    server = new EchoServer(0);
+    serverPort = server.port();
+    serverThread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          server.run();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    };
+    serverThread.start();
   }
 
-  public static byte[] array(ArrowBuf buf) {
-    byte[] bytes = new byte[buf.readableBytes()];
-    buf.readBytes(bytes);
-    return bytes;
+  @AfterClass
+  public static void stopEchoServer() throws IOException, InterruptedException {
+    server.close();
+    serverThread.join();
   }
 
-  private void testEchoServer(int serverPort, Schema schema, List<ArrowRecordBatch> batches)
+  private void testEchoServer(int serverPort,
+                              Field field,
+                              NullableTinyIntVector vector,
+                              int batches)
       throws UnknownHostException, IOException {
     BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
+    VectorSchemaRoot root = new VectorSchemaRoot(asList(field), asList((FieldVector) vector), 0);
     try (Socket socket = new Socket("localhost", serverPort);
-        ArrowStreamWriter writer = new ArrowStreamWriter(socket.getOutputStream(), schema);
+        ArrowStreamWriter writer = new ArrowStreamWriter(root, null, socket.getOutputStream());
         ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), alloc)) {
-      for (ArrowRecordBatch batch: batches) {
-        writer.writeRecordBatch(batch);
+      writer.start();
+      for (int i = 0; i < batches; i++) {
+        vector.allocateNew(16);
+        for (int j = 0; j < 8; j++) {
+          vector.getMutator().set(j, j + i);
+          vector.getMutator().set(j + 8, 0, (byte) (j + i));
+        }
+        vector.getMutator().setValueCount(16);
+        root.setRowCount(16);
+        writer.writeBatch();
       }
       writer.end();
 
-      reader.init();
-      assertEquals(schema, reader.getSchema());
-      for (int i = 0; i < batches.size(); i++) {
-        ArrowRecordBatch result = reader.nextRecordBatch();
-        ArrowRecordBatch expected = batches.get(i);
-        assertTrue(result != null);
-        assertEquals(expected.getBuffers().size(), result.getBuffers().size());
-        for (int j = 0; j < expected.getBuffers().size(); j++) {
-          assertTrue(expected.getBuffers().get(j).compareTo(result.getBuffers().get(j)) == 0);
+      assertEquals(new Schema(asList(field)), reader.getVectorSchemaRoot().getSchema());
+
+      NullableTinyIntVector readVector = (NullableTinyIntVector) reader.getVectorSchemaRoot().getFieldVectors().get(0);
+      for (int i = 0; i < batches; i++) {
+        reader.loadNextBatch();
+        assertEquals(16, reader.getVectorSchemaRoot().getRowCount());
+        assertEquals(16, readVector.getAccessor().getValueCount());
+        for (int j = 0; j < 8; j++) {
+          assertEquals(j + i, readVector.getAccessor().get(j));
+          assertTrue(readVector.getAccessor().isNull(j + 8));
         }
       }
-      ArrowRecordBatch result = reader.nextRecordBatch();
-      assertTrue(result == null);
+      reader.loadNextBatch();
+      assertEquals(0, reader.getVectorSchemaRoot().getRowCount());
       assertEquals(reader.bytesRead(), writer.bytesWritten());
     }
   }
 
   @Test
   public void basicTest() throws InterruptedException, IOException {
-    final EchoServer server = new EchoServer(0);
-    int serverPort = server.port();
-    Thread serverThread = new Thread() {
-      @Override
-      public void run() {
-        try {
-          server.run();
-        } catch (IOException e) {
-          e.printStackTrace();
-        }
-      }
-    };
-    serverThread.start();
-
     BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
-    byte[] validity = new byte[] { (byte)255, 0};
-    byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
-    ArrowBuf validityb = buf(alloc, validity);
-    ArrowBuf valuesb =  buf(alloc, values);
-    ArrowRecordBatch batch = new ArrowRecordBatch(
-        16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb));
 
-    Schema schema = new Schema(asList(new Field(
-        "testField", true, new ArrowType.Int(8, true), Collections.<Field>emptyList())));
+    Field field = new Field("testField", true, new ArrowType.Int(8, true), Collections.<Field>emptyList());
+    NullableTinyIntVector vector = new NullableTinyIntVector("testField", alloc, null);
+    Schema schema = new Schema(asList(field));
 
     // Try an empty stream, just the header.
-    testEchoServer(serverPort, schema, new ArrayList<ArrowRecordBatch>());
+    testEchoServer(serverPort, field, vector, 0);
 
     // Try with one batch.
-    List<ArrowRecordBatch> batches = new ArrayList<>();
-    batches.add(batch);
-    testEchoServer(serverPort, schema, batches);
+    testEchoServer(serverPort, field, vector, 1);
 
     // Try with a few
-    for (int i = 0; i < 10; i++) {
-      batches.add(batch);
+    testEchoServer(serverPort, field, vector, 10);
+  }
+
+  @Test
+  public void testFlatDictionary() throws IOException {
+    DictionaryEncoding writeEncoding = new DictionaryEncoding(1L, false, null);
+    try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+         NullableIntVector writeVector = new NullableIntVector("varchar", allocator, writeEncoding);
+         NullableVarCharVector writeDictionaryVector = new NullableVarCharVector("dict", allocator, null)) {
+      writeVector.allocateNewSafe();
+      NullableIntVector.Mutator mutator = writeVector.getMutator();
+      mutator.set(0, 0);
+      mutator.set(1, 1);
+      mutator.set(3, 2);
+      mutator.set(4, 1);
+      mutator.set(5, 2);
+      mutator.setValueCount(6);
+
+      writeDictionaryVector.allocateNewSafe();
+      NullableVarCharVector.Mutator dictionaryMutator = writeDictionaryVector.getMutator();
+      dictionaryMutator.set(0, "foo".getBytes(StandardCharsets.UTF_8));
+      dictionaryMutator.set(1, "bar".getBytes(StandardCharsets.UTF_8));
+      dictionaryMutator.set(2, "baz".getBytes(StandardCharsets.UTF_8));
+      dictionaryMutator.setValueCount(3);
+
+      List<Field> fields = ImmutableList.of(writeVector.getField());
+      List<FieldVector> vectors = ImmutableList.of((FieldVector) writeVector);
+      VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 6);
+
+      DictionaryProvider writeProvider = new MapDictionaryProvider(new Dictionary(writeDictionaryVector, writeEncoding));
+
+      try (Socket socket = new Socket("localhost", serverPort);
+           ArrowStreamWriter writer = new ArrowStreamWriter(root, writeProvider, socket.getOutputStream());
+           ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) {
+        writer.start();
+        writer.writeBatch();
+        writer.end();
+
+        reader.loadNextBatch();
+        VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+        Assert.assertEquals(6, readerRoot.getRowCount());
+
+        FieldVector readVector = readerRoot.getFieldVectors().get(0);
+        Assert.assertNotNull(readVector);
+
+        DictionaryEncoding readEncoding = readVector.getField().getDictionary();
+        Assert.assertNotNull(readEncoding);
+        Assert.assertEquals(1L, readEncoding.getId());
+
+        FieldVector.Accessor accessor = readVector.getAccessor();
+        Assert.assertEquals(6, accessor.getValueCount());
+        Assert.assertEquals(0, accessor.getObject(0));
+        Assert.assertEquals(1, accessor.getObject(1));
+        Assert.assertEquals(null, accessor.getObject(2));
+        Assert.assertEquals(2, accessor.getObject(3));
+        Assert.assertEquals(1, accessor.getObject(4));
+        Assert.assertEquals(2, accessor.getObject(5));
+
+        Dictionary dictionary = reader.lookup(1L);
+        Assert.assertNotNull(dictionary);
+        NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) dictionary.getVector()).getAccessor();
+        Assert.assertEquals(3, dictionaryAccessor.getValueCount());
+        Assert.assertEquals(new Text("foo"), dictionaryAccessor.getObject(0));
+        Assert.assertEquals(new Text("bar"), dictionaryAccessor.getObject(1));
+        Assert.assertEquals(new Text("baz"), dictionaryAccessor.getObject(2));
+      }
     }
-    testEchoServer(serverPort, schema, batches);
+  }
 
-    server.close();
-    serverThread.join();
+  @Test
+  public void testNestedDictionary() throws IOException {
+    DictionaryEncoding writeEncoding = new DictionaryEncoding(2L, false, null);
+    try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+         NullableVarCharVector writeDictionaryVector = new NullableVarCharVector("dictionary", allocator, null);
+         ListVector writeVector = new ListVector("list", allocator, null, null)) {
+
+      // data being written:
+      // [['foo', 'bar'], ['foo'], ['bar']] -> [[0, 1], [0], [1]]
+
+      writeDictionaryVector.allocateNew();
+      writeDictionaryVector.getMutator().set(0, "foo".getBytes(StandardCharsets.UTF_8));
+      writeDictionaryVector.getMutator().set(1, "bar".getBytes(StandardCharsets.UTF_8));
+      writeDictionaryVector.getMutator().setValueCount(2);
+
+      writeVector.addOrGetVector(MinorType.INT, writeEncoding);
+      writeVector.allocateNew();
+      UnionListWriter listWriter = new UnionListWriter(writeVector);
+      listWriter.startList();
+      listWriter.writeInt(0);
+      listWriter.writeInt(1);
+      listWriter.endList();
+      listWriter.startList();
+      listWriter.writeInt(0);
+      listWriter.endList();
+      listWriter.startList();
+      listWriter.writeInt(1);
+      listWriter.endList();
+      listWriter.setValueCount(3);
+
+      List<Field> fields = ImmutableList.of(writeVector.getField());
+      List<FieldVector> vectors = ImmutableList.of((FieldVector) writeVector);
+      VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 3);
+
+      DictionaryProvider writeProvider = new MapDictionaryProvider(new Dictionary(writeDictionaryVector, writeEncoding));
+
+      try (Socket socket = new Socket("localhost", serverPort);
+           ArrowStreamWriter writer = new ArrowStreamWriter(root, writeProvider, socket.getOutputStream());
+           ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) {
+        writer.start();
+        writer.writeBatch();
+        writer.end();
+
+        reader.loadNextBatch();
+        VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+        Assert.assertEquals(3, readerRoot.getRowCount());
+
+        ListVector readVector = (ListVector) readerRoot.getFieldVectors().get(0);
+        Assert.assertNotNull(readVector);
+
+        Assert.assertNull(readVector.getField().getDictionary());
+        DictionaryEncoding readEncoding = readVector.getField().getChildren().get(0).getDictionary();
+        Assert.assertNotNull(readEncoding);
+        Assert.assertEquals(2L, readEncoding.getId());
+
+        Field nestedField = readVector.getField().getChildren().get(0);
+
+        DictionaryEncoding encoding = nestedField.getDictionary();
+        Assert.assertNotNull(encoding);
+        Assert.assertEquals(2L, encoding.getId());
+        Assert.assertEquals(new Int(32, true), encoding.getIndexType());
+
+        ListVector.Accessor accessor = readVector.getAccessor();
+        Assert.assertEquals(3, accessor.getValueCount());
+        Assert.assertEquals(Arrays.asList(0, 1), accessor.getObject(0));
+        Assert.assertEquals(Arrays.asList(0), accessor.getObject(1));
+        Assert.assertEquals(Arrays.asList(1), accessor.getObject(2));
+
+        Dictionary readDictionary = reader.lookup(2L);
+        Assert.assertNotNull(readDictionary);
+        NullableVarCharVector.Accessor dictionaryAccessor = ((NullableVarCharVector) readDictionary.getVector()).getAccessor();
+        Assert.assertEquals(2, dictionaryAccessor.getValueCount());
+        Assert.assertEquals(new Text("foo"), dictionaryAccessor.getObject(0));
+        Assert.assertEquals(new Text("bar"), dictionaryAccessor.getObject(1));
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java
----------------------------------------------------------------------
diff --git a/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java b/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java
index 0ae32be..9d4ef5c 100644
--- a/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java
+++ b/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java
@@ -33,6 +33,11 @@ import java.io.IOException;
 import java.io.StringReader;
 import java.util.Map;
 
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter.NopIndenter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.tools.Integration.Command;
@@ -49,11 +54,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
-import com.fasterxml.jackson.core.util.DefaultPrettyPrinter.NopIndenter;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-
 public class TestIntegration {
 
   @Rule
@@ -128,6 +128,34 @@ public class TestIntegration {
     }
   }
 
+  @Test
+  public void testJSONRoundTripWithStruct() throws Exception {
+    File testJSONFile = new File("../../integration/data/struct_example.json");
+    File testOutFile = testFolder.newFile("testOutStruct.arrow");
+    File testRoundTripJSONFile = testFolder.newFile("testOutStruct.json");
+    testOutFile.delete();
+    testRoundTripJSONFile.delete();
+
+    Integration integration = new Integration();
+
+    // convert to arrow
+    String[] args1 = { "-arrow", testOutFile.getAbsolutePath(), "-json",  testJSONFile.getAbsolutePath(), "-command", Command.JSON_TO_ARROW.name()};
+    integration.run(args1);
+
+    // convert back to json
+    String[] args2 = { "-arrow", testOutFile.getAbsolutePath(), "-json",  testRoundTripJSONFile.getAbsolutePath(), "-command", Command.ARROW_TO_JSON.name()};
+    integration.run(args2);
+
+    BufferedReader orig = readNormalized(testJSONFile);
+    BufferedReader rt = readNormalized(testRoundTripJSONFile);
+    String i, o;
+    int j = 0;
+    while ((i = orig.readLine()) != null && (o = rt.readLine()) != null) {
+      assertEquals("line: " + j, i, o);
+      ++j;
+    }
+  }
+
   private ObjectMapper om = new ObjectMapper();
   {
     DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter();

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/tools/tmptestfilesio
----------------------------------------------------------------------
diff --git a/java/tools/tmptestfilesio b/java/tools/tmptestfilesio
new file mode 100644
index 0000000..d1b6b6c
Binary files /dev/null and b/java/tools/tmptestfilesio differ

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/codegen/templates/MapWriters.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/templates/MapWriters.java b/java/vector/src/main/codegen/templates/MapWriters.java
index 4af6eee..428ce04 100644
--- a/java/vector/src/main/codegen/templates/MapWriters.java
+++ b/java/vector/src/main/codegen/templates/MapWriters.java
@@ -64,7 +64,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter {
         list(child.getName());
         break;
       case UNION:
-        UnionWriter writer = new UnionWriter(container.addOrGet(child.getName(), MinorType.UNION, UnionVector.class), getNullableMapWriterFactory());
+        UnionWriter writer = new UnionWriter(container.addOrGet(child.getName(), MinorType.UNION, UnionVector.class, null), getNullableMapWriterFactory());
         fields.put(handleCase(child.getName()), writer);
         break;
 <#list vv.types as type><#list type.minor as minor>
@@ -113,7 +113,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter {
     FieldWriter writer = fields.get(finalName);
     if(writer == null){
       int vectorCount=container.size();
-      NullableMapVector vector = container.addOrGet(name, MinorType.MAP, NullableMapVector.class);
+      NullableMapVector vector = container.addOrGet(name, MinorType.MAP, NullableMapVector.class, null);
       writer = new PromotableWriter(vector, container, getNullableMapWriterFactory());
       if(vectorCount != container.size()) {
         writer.allocate();
@@ -157,7 +157,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter {
     FieldWriter writer = fields.get(finalName);
     int vectorCount = container.size();
     if(writer == null) {
-      writer = new PromotableWriter(container.addOrGet(name, MinorType.LIST, ListVector.class), container, getNullableMapWriterFactory());
+      writer = new PromotableWriter(container.addOrGet(name, MinorType.LIST, ListVector.class, null), container, getNullableMapWriterFactory());
       if (container.size() > vectorCount) {
         writer.allocate();
       }
@@ -222,7 +222,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter {
     if(writer == null) {
       ValueVector vector;
       ValueVector currentVector = container.getChild(name);
-      ${vectName}Vector v = container.addOrGet(name, MinorType.${upperName}, ${vectName}Vector.class<#if minor.class == "Decimal"> , new int[] {precision, scale}</#if>);
+      ${vectName}Vector v = container.addOrGet(name, MinorType.${upperName}, ${vectName}Vector.class, null<#if minor.class == "Decimal"> , new int[] {precision, scale}</#if>);
       writer = new PromotableWriter(v, container, getNullableMapWriterFactory());
       vector = v;
       if (currentVector == null || currentVector != vector) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/templates/NullableValueVectors.java b/java/vector/src/main/codegen/templates/NullableValueVectors.java
index 6b25fb3..b3e10e3 100644
--- a/java/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/java/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -65,21 +65,21 @@ public final class ${className} extends BaseDataValueVector implements <#if type
   private final int precision;
   private final int scale;
 
-  public ${className}(String name, BufferAllocator allocator, int precision, int scale) {
+  public ${className}(String name, BufferAllocator allocator, DictionaryEncoding dictionary, int precision, int scale) {
     super(name, allocator);
     values = new ${valuesName}(valuesField, allocator, precision, scale);
     this.precision = precision;
     this.scale = scale;
     mutator = new Mutator();
     accessor = new Accessor();
-    field = new Field(name, true, new Decimal(precision, scale), null);
+    field = new Field(name, true, new Decimal(precision, scale), dictionary, null);
     innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList(
         bits,
         values
     ));
   }
   <#else>
-  public ${className}(String name, BufferAllocator allocator) {
+  public ${className}(String name, BufferAllocator allocator, DictionaryEncoding dictionary) {
     super(name, allocator);
     values = new ${valuesName}(valuesField, allocator);
     mutator = new Mutator();
@@ -88,38 +88,38 @@ public final class ${className} extends BaseDataValueVector implements <#if type
         minor.class == "SmallInt" ||
         minor.class == "Int" ||
         minor.class == "BigInt">
-    field = new Field(name, true, new Int(${type.width} * 8, true), null);
+    field = new Field(name, true, new Int(${type.width} * 8, true), dictionary, null);
   <#elseif minor.class == "UInt1" ||
         minor.class == "UInt2" ||
         minor.class == "UInt4" ||
         minor.class == "UInt8">
-    field = new Field(name, true, new Int(${type.width} * 8, false), null);
+    field = new Field(name, true, new Int(${type.width} * 8, false), dictionary, null);
   <#elseif minor.class == "Date">
-    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Date(), null);
+    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Date(), dictionary, null);
   <#elseif minor.class == "Time">
-    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Time(), null);
+    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Time(), dictionary, null);
   <#elseif minor.class == "Float4">
-    field = new Field(name, true, new FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE), null);
+    field = new Field(name, true, new FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE), dictionary, null);
   <#elseif minor.class == "Float8">
-    field = new Field(name, true, new FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE), null);
+    field = new Field(name, true, new FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE), dictionary, null);
   <#elseif minor.class == "TimeStampSec">
-    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.SECOND), null);
+    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.SECOND), dictionary, null);
   <#elseif minor.class == "TimeStampMilli">
-    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.MILLISECOND), null);
+    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.MILLISECOND), dictionary, null);
   <#elseif minor.class == "TimeStampMicro">
-    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.MICROSECOND), null);
+    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.MICROSECOND), dictionary, null);
   <#elseif minor.class == "TimeStampNano">
-    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.NANOSECOND), null);
+    field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(org.apache.arrow.vector.types.TimeUnit.NANOSECOND), dictionary, null);
   <#elseif minor.class == "IntervalDay">
-    field = new Field(name, true, new Interval(org.apache.arrow.vector.types.IntervalUnit.DAY_TIME), null);
+    field = new Field(name, true, new Interval(org.apache.arrow.vector.types.IntervalUnit.DAY_TIME), dictionary, null);
   <#elseif minor.class == "IntervalYear">
-    field = new Field(name, true, new Interval(org.apache.arrow.vector.types.IntervalUnit.YEAR_MONTH), null);
+    field = new Field(name, true, new Interval(org.apache.arrow.vector.types.IntervalUnit.YEAR_MONTH), dictionary, null);
   <#elseif minor.class == "VarChar">
-    field = new Field(name, true, new Utf8(), null);
+    field = new Field(name, true, new Utf8(), dictionary, null);
   <#elseif minor.class == "VarBinary">
-    field = new Field(name, true, new Binary(), null);
+    field = new Field(name, true, new Binary(), dictionary, null);
   <#elseif minor.class == "Bit">
-    field = new Field(name, true, new Bool(), null);
+    field = new Field(name, true, new Bool(), dictionary, null);
   </#if>
     innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList(
         bits,
@@ -378,9 +378,9 @@ public final class ${className} extends BaseDataValueVector implements <#if type
 
     public TransferImpl(String name, BufferAllocator allocator){
       <#if minor.class == "Decimal">
-      to = new ${className}(name, allocator, precision, scale);
+      to = new ${className}(name, allocator, field.getDictionary(), precision, scale);
       <#else>
-      to = new ${className}(name, allocator);
+      to = new ${className}(name, allocator, field.getDictionary());
       </#if>
     }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/codegen/templates/UnionVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java
index 1a6908d..076ed93 100644
--- a/java/vector/src/main/codegen/templates/UnionVector.java
+++ b/java/vector/src/main/codegen/templates/UnionVector.java
@@ -118,11 +118,11 @@ public class UnionVector implements FieldVector {
   public List<BufferBacked> getFieldInnerVectors() {
      return this.innerVectors;
   }
-  
+
   public NullableMapVector getMap() {
     if (mapVector == null) {
       int vectorCount = internalMap.size();
-      mapVector = internalMap.addOrGet("map", MinorType.MAP, NullableMapVector.class);
+      mapVector = internalMap.addOrGet("map", MinorType.MAP, NullableMapVector.class, null);
       if (internalMap.size() > vectorCount) {
         mapVector.allocateNew();
         if (callBack != null) {
@@ -144,7 +144,7 @@ public class UnionVector implements FieldVector {
   public Nullable${name}Vector get${name}Vector() {
     if (${uncappedName}Vector == null) {
       int vectorCount = internalMap.size();
-      ${uncappedName}Vector = internalMap.addOrGet("${lowerCaseName}", MinorType.${name?upper_case}, Nullable${name}Vector.class);
+      ${uncappedName}Vector = internalMap.addOrGet("${lowerCaseName}", MinorType.${name?upper_case}, Nullable${name}Vector.class, null);
       if (internalMap.size() > vectorCount) {
         ${uncappedName}Vector.allocateNew();
         if (callBack != null) {
@@ -162,7 +162,7 @@ public class UnionVector implements FieldVector {
   public ListVector getList() {
     if (listVector == null) {
       int vectorCount = internalMap.size();
-      listVector = internalMap.addOrGet("list", MinorType.LIST, ListVector.class);
+      listVector = internalMap.addOrGet("list", MinorType.LIST, ListVector.class, null);
       if (internalMap.size() > vectorCount) {
         listVector.allocateNew();
         if (callBack != null) {
@@ -262,7 +262,7 @@ public class UnionVector implements FieldVector {
   public FieldVector addVector(FieldVector v) {
     String name = v.getMinorType().name().toLowerCase();
     Preconditions.checkState(internalMap.getChild(name) == null, String.format("%s vector already exists", name));
-    final FieldVector newVector = internalMap.addOrGet(name, v.getMinorType(), v.getClass());
+    final FieldVector newVector = internalMap.addOrGet(name, v.getMinorType(), v.getClass(), v.getField().getDictionary());
     v.makeTransferPair(newVector).transfer();
     internalMap.putChild(name, newVector);
     if (callBack != null) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java
index d1e9abe..179f2ee 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java
@@ -81,6 +81,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
     } else {
       super.load(fieldNode, data);
     }
+    this.valueCount = fieldNode.getLength();
   }
 
   @Override
@@ -451,7 +452,6 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
 
     /**
      * set count bits to 1 in data starting at firstBitIndex
-     * @param data the buffer to set
      * @param firstBitIndex the index of the first bit to set
      * @param count the number of bits to set
      */

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
index b28433c..0fdbc48 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
@@ -19,11 +19,10 @@ package org.apache.arrow.vector;
 
 import java.util.List;
 
+import io.netty.buffer.ArrowBuf;
 import org.apache.arrow.vector.schema.ArrowFieldNode;
 import org.apache.arrow.vector.types.pojo.Field;
 
-import io.netty.buffer.ArrowBuf;
-
 /**
  * A vector corresponding to a Field in the schema
  * It has inner vectors backed by buffers (validity, offsets, data, ...)
@@ -61,5 +60,4 @@ public interface FieldVector extends ValueVector {
    * @return the inner vectors for this field as defined by the TypeLayout
    */
   List<BufferBacked> getFieldInnerVectors();
-
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
index 5c1176c..76de250 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
@@ -36,15 +36,14 @@ import io.netty.buffer.ArrowBuf;
  * Loads buffers into vectors
  */
 public class VectorLoader {
+
   private final VectorSchemaRoot root;
 
   /**
    * will create children in root based on schema
-   * @param schema the expected schema
    * @param root the root to add vectors to based on schema
    */
   public VectorLoader(VectorSchemaRoot root) {
-    super();
     this.root = root;
   }
 
@@ -57,18 +56,16 @@ public class VectorLoader {
     Iterator<ArrowBuf> buffers = recordBatch.getBuffers().iterator();
     Iterator<ArrowFieldNode> nodes = recordBatch.getNodes().iterator();
     List<Field> fields = root.getSchema().getFields();
-    for (int i = 0; i < fields.size(); ++i) {
-      Field field = fields.get(i);
+    for (Field field: fields) {
       FieldVector fieldVector = root.getVector(field.getName());
       loadBuffers(fieldVector, field, buffers, nodes);
     }
     root.setRowCount(recordBatch.getLength());
     if (nodes.hasNext() || buffers.hasNext()) {
-      throw new IllegalArgumentException("not all nodes and buffers where consumed. nodes: " + Iterators.toString(nodes) + " buffers: " + Iterators.toString(buffers));
+      throw new IllegalArgumentException("not all nodes and buffers were consumed. nodes: " + Iterators.toString(nodes) + " buffers: " + Iterators.toString(buffers));
     }
   }
 
-
   private void loadBuffers(FieldVector vector, Field field, Iterator<ArrowBuf> buffers, Iterator<ArrowFieldNode> nodes) {
     checkArgument(nodes.hasNext(),
         "no more field nodes for for field " + field + " and vector " + vector);
@@ -82,7 +79,7 @@ public class VectorLoader {
       vector.loadFieldBuffers(fieldNode, ownBuffers);
     } catch (RuntimeException e) {
       throw new IllegalArgumentException("Could not load buffers for field " +
-              field + ". error message: " + e.getMessage(), e);
+            field + ". error message: " + e.getMessage(), e);
     }
     List<Field> children = field.getChildren();
     if (children.size() > 0) {
@@ -96,4 +93,4 @@ public class VectorLoader {
     }
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
index 1cbe187..7e626fb 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorSchemaRoot.java
@@ -18,7 +18,6 @@
 package org.apache.arrow.vector;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +28,9 @@ import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
 
+/**
+ * Holder for a set of vectors to be loaded/unloaded
+ */
 public class VectorSchemaRoot implements AutoCloseable {
 
   private final Schema schema;
@@ -37,9 +39,17 @@ public class VectorSchemaRoot implements AutoCloseable {
   private final Map<String, FieldVector> fieldVectorsMap = new HashMap<>();
 
   public VectorSchemaRoot(FieldVector parent) {
-    this.schema = new Schema(parent.getField().getChildren());
-    this.rowCount = parent.getAccessor().getValueCount();
-    this.fieldVectors = parent.getChildrenFromFields();
+    this(parent.getField().getChildren(), parent.getChildrenFromFields(), parent.getAccessor().getValueCount());
+  }
+
+  public VectorSchemaRoot(List<Field> fields, List<FieldVector> fieldVectors, int rowCount) {
+    if (fields.size() != fieldVectors.size()) {
+      throw new IllegalArgumentException("Fields must match field vectors. Found " +
+          fieldVectors.size() + " vectors and " + fields.size() + " fields");
+    }
+    this.schema = new Schema(fields);
+    this.rowCount = rowCount;
+    this.fieldVectors = fieldVectors;
     for (int i = 0; i < schema.getFields().size(); ++i) {
       Field field = schema.getFields().get(i);
       FieldVector vector = fieldVectors.get(i);
@@ -47,21 +57,19 @@ public class VectorSchemaRoot implements AutoCloseable {
     }
   }
 
-  public VectorSchemaRoot(Schema schema, BufferAllocator allocator) {
-    super();
-    this.schema = schema;
+  public static VectorSchemaRoot create(Schema schema, BufferAllocator allocator) {
     List<FieldVector> fieldVectors = new ArrayList<>();
     for (Field field : schema.getFields()) {
       MinorType minorType = Types.getMinorTypeForArrowType(field.getType());
-      FieldVector vector = minorType.getNewVector(field.getName(), allocator, null);
+      FieldVector vector = minorType.getNewVector(field.getName(), allocator, field.getDictionary(), null);
       vector.initializeChildrenFromFields(field.getChildren());
       fieldVectors.add(vector);
-      fieldVectorsMap.put(field.getName(), vector);
     }
-    this.fieldVectors = Collections.unmodifiableList(fieldVectors);
-    if (this.fieldVectors.size() != schema.getFields().size()) {
-      throw new IllegalArgumentException("The root vector did not create the right number of children. found " + fieldVectors.size() + " expected " + schema.getFields().size());
+    if (fieldVectors.size() != schema.getFields().size()) {
+      throw new IllegalArgumentException("The root vector did not create the right number of children. found " +
+        fieldVectors.size() + " expected " + schema.getFields().size());
     }
+    return new VectorSchemaRoot(schema.getFields(), fieldVectors, 0);
   }
 
   public List<FieldVector> getFieldVectors() {

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
index 92d8cb0..8e9ff6d 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
@@ -20,42 +20,27 @@ package org.apache.arrow.vector;
 import java.util.ArrayList;
 import java.util.List;
 
+import io.netty.buffer.ArrowBuf;
 import org.apache.arrow.vector.ValueVector.Accessor;
 import org.apache.arrow.vector.schema.ArrowFieldNode;
 import org.apache.arrow.vector.schema.ArrowRecordBatch;
 import org.apache.arrow.vector.schema.ArrowVectorType;
-import org.apache.arrow.vector.types.pojo.Schema;
-
-import io.netty.buffer.ArrowBuf;
 
 public class VectorUnloader {
 
-  private final Schema schema;
-  private final int valueCount;
-  private final List<FieldVector> vectors;
-
-  public VectorUnloader(Schema schema, int valueCount, List<FieldVector> vectors) {
-    super();
-    this.schema = schema;
-    this.valueCount = valueCount;
-    this.vectors = vectors;
-  }
+  private final VectorSchemaRoot root;
 
   public VectorUnloader(VectorSchemaRoot root) {
-    this(root.getSchema(), root.getRowCount(), root.getFieldVectors());
-  }
-
-  public Schema getSchema() {
-    return schema;
+    this.root = root;
   }
 
   public ArrowRecordBatch getRecordBatch() {
     List<ArrowFieldNode> nodes = new ArrayList<>();
     List<ArrowBuf> buffers = new ArrayList<>();
-    for (FieldVector vector : vectors) {
+    for (FieldVector vector : root.getFieldVectors()) {
       appendNodes(vector, nodes, buffers);
     }
-    return new ArrowRecordBatch(valueCount, nodes, buffers);
+    return new ArrowRecordBatch(root.getRowCount(), nodes, buffers);
   }
 
   private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) {
@@ -74,4 +59,4 @@ public class VectorUnloader {
     }
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
index 2f68886..86a5e82 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
@@ -22,6 +22,7 @@ import org.apache.arrow.memory.OutOfMemoryException;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
 import org.apache.arrow.vector.util.CallBack;
 
 /**
@@ -85,7 +86,7 @@ public abstract class AbstractContainerVector implements ValueVector {
   public abstract int size();
 
   // add a new vector with the input MajorType or return the existing vector if we already added one with the same type
-  public abstract <T extends FieldVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, int... precisionScale);
+  public abstract <T extends FieldVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, DictionaryEncoding dictionary, int... precisionScale);
 
   // return the child vector with the input name
   public abstract <T extends FieldVector> T getChild(String name, Class<T> clazz);

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
index f030d16..baeeb07 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
@@ -26,6 +26,7 @@ import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
 import org.apache.arrow.vector.util.CallBack;
 import org.apache.arrow.vector.util.MapWithOrdinal;
 
@@ -110,7 +111,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
    * @return resultant {@link org.apache.arrow.vector.ValueVector}
    */
   @Override
-  public <T extends FieldVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, int... precisionScale) {
+  public <T extends FieldVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, DictionaryEncoding dictionary, int... precisionScale) {
     final ValueVector existing = getChild(name);
     boolean create = false;
     if (existing == null) {
@@ -122,7 +123,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
       create = true;
     }
     if (create) {
-      final T vector = clazz.cast(minorType.getNewVector(name, allocator, callBack, precisionScale));
+      final T vector = clazz.cast(minorType.getNewVector(name, allocator, dictionary, callBack, precisionScale));
       putChild(name, vector);
       if (callBack!=null) {
         callBack.doWork();
@@ -162,12 +163,12 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
     return typeify(v, clazz);
   }
 
-  protected ValueVector add(String name, MinorType minorType, int... precisionScale) {
+  protected ValueVector add(String name, MinorType minorType, DictionaryEncoding dictionary, int... precisionScale) {
     final ValueVector existing = getChild(name);
     if (existing != null) {
       throw new IllegalStateException(String.format("Vector already exists: Existing[%s], Requested[%s] ", existing.getClass().getSimpleName(), minorType));
     }
-    FieldVector vector = minorType.getNewVector(name, allocator, callBack, precisionScale);
+    FieldVector vector = minorType.getNewVector(name, allocator, dictionary, callBack, precisionScale);
     putChild(name, vector);
     if (callBack!=null) {
       callBack.doWork();

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
index 7424df4..eeb8f58 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
@@ -28,6 +28,7 @@ import org.apache.arrow.vector.UInt4Vector;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.ZeroVector;
 import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
 import org.apache.arrow.vector.util.SchemaChangeRuntimeException;
 
 import com.google.common.base.Preconditions;
@@ -150,10 +151,10 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements
     return vector == DEFAULT_DATA_VECTOR ? 0:1;
   }
 
-  public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(MinorType minorType) {
+  public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(MinorType minorType, DictionaryEncoding dictionary) {
     boolean created = false;
     if (vector instanceof ZeroVector) {
-      vector = minorType.getNewVector(DATA_VECTOR_NAME, allocator, null);
+      vector = minorType.getNewVector(DATA_VECTOR_NAME, allocator, dictionary, null);
       // returned vector must have the same field
       created = true;
     }

http://git-wip-us.apache.org/repos/asf/arrow/blob/49f666e7/java/vector/src/main/java/org/apache/arrow/vector/complex/DictionaryVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/DictionaryVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/DictionaryVector.java
deleted file mode 100644
index 84760ea..0000000
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/DictionaryVector.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*******************************************************************************
-
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.arrow.vector.complex;
-
-import io.netty.buffer.ArrowBuf;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.OutOfMemoryException;
-import org.apache.arrow.vector.NullableIntVector;
-import org.apache.arrow.vector.ValueVector;
-import org.apache.arrow.vector.complex.reader.FieldReader;
-import org.apache.arrow.vector.types.Dictionary;
-import org.apache.arrow.vector.types.Types.MinorType;
-import org.apache.arrow.vector.types.pojo.Field;
-import org.apache.arrow.vector.util.TransferPair;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-public class DictionaryVector implements ValueVector {
-
-  private ValueVector indices;
-  private Dictionary dictionary;
-
-  public DictionaryVector(ValueVector indices, Dictionary dictionary) {
-    this.indices = indices;
-    this.dictionary = dictionary;
-  }
-
-  /**
-   * Dictionary encodes a vector. The dictionary will be built using the values from the vector.
-   *
-   * @param vector vector to encode
-   * @return dictionary encoded vector
-   */
-  public static DictionaryVector encode(ValueVector vector) {
-    validateType(vector.getMinorType());
-    Map<Object, Integer> lookUps = new HashMap<>();
-    Map<Integer, Integer> transfers = new HashMap<>();
-
-    ValueVector.Accessor accessor = vector.getAccessor();
-    int count = accessor.getValueCount();
-
-    NullableIntVector indices = new NullableIntVector(vector.getField().getName(), vector.getAllocator());
-    indices.allocateNew(count);
-    NullableIntVector.Mutator mutator = indices.getMutator();
-
-    int nextIndex = 0;
-    for (int i = 0; i < count; i++) {
-      Object value = accessor.getObject(i);
-      if (value != null) { // if it's null leave it null
-        Integer index = lookUps.get(value);
-        if (index == null) {
-          index = nextIndex++;
-          lookUps.put(value, index);
-          transfers.put(i, index);
-        }
-        mutator.set(i, index);
-      }
-    }
-    mutator.setValueCount(count);
-
-    // copy the dictionary values into the dictionary vector
-    TransferPair dictionaryTransfer = vector.getTransferPair(vector.getAllocator());
-    ValueVector dictionaryVector = dictionaryTransfer.getTo();
-    dictionaryVector.allocateNewSafe();
-    for (Map.Entry<Integer, Integer> entry: transfers.entrySet()) {
-      dictionaryTransfer.copyValueSafe(entry.getKey(), entry.getValue());
-    }
-    dictionaryVector.getMutator().setValueCount(transfers.size());
-    Dictionary dictionary = new Dictionary(dictionaryVector, false);
-
-    return new DictionaryVector(indices, dictionary);
-  }
-
-  /**
-   * Dictionary encodes a vector with a provided dictionary. The dictionary must contain all values in the vector.
-   *
-   * @param vector vector to encode
-   * @param dictionary dictionary used for encoding
-   * @return dictionary encoded vector
-   */
-  public static DictionaryVector encode(ValueVector vector, Dictionary dictionary) {
-    validateType(vector.getMinorType());
-    // load dictionary values into a hashmap for lookup
-    ValueVector.Accessor dictionaryAccessor = dictionary.getDictionary().getAccessor();
-    Map<Object, Integer> lookUps = new HashMap<>(dictionaryAccessor.getValueCount());
-    for (int i = 0; i < dictionaryAccessor.getValueCount(); i++) {
-      // for primitive array types we need a wrapper that implements equals and hashcode appropriately
-      lookUps.put(dictionaryAccessor.getObject(i), i);
-    }
-
-    // vector to hold our indices (dictionary encoded values)
-    NullableIntVector indices = new NullableIntVector(vector.getField().getName(), vector.getAllocator());
-    NullableIntVector.Mutator mutator = indices.getMutator();
-
-    ValueVector.Accessor accessor = vector.getAccessor();
-    int count = accessor.getValueCount();
-
-    indices.allocateNew(count);
-
-    for (int i = 0; i < count; i++) {
-      Object value = accessor.getObject(i);
-      if (value != null) { // if it's null leave it null
-        // note: this may fail if value was not included in the dictionary
-        mutator.set(i, lookUps.get(value));
-      }
-    }
-    mutator.setValueCount(count);
-
-    return new DictionaryVector(indices, dictionary);
-  }
-
-  /**
-   * Decodes a dictionary encoded array using the provided dictionary.
-   *
-   * @param indices dictionary encoded values, must be int type
-   * @param dictionary dictionary used to decode the values
-   * @return vector with values restored from dictionary
-   */
-  public static ValueVector decode(ValueVector indices, Dictionary dictionary) {
-    ValueVector.Accessor accessor = indices.getAccessor();
-    int count = accessor.getValueCount();
-    ValueVector dictionaryVector = dictionary.getDictionary();
-    // copy the dictionary values into the decoded vector
-    TransferPair transfer = dictionaryVector.getTransferPair(indices.getAllocator());
-    transfer.getTo().allocateNewSafe();
-    for (int i = 0; i < count; i++) {
-      Object index = accessor.getObject(i);
-      if (index != null) {
-        transfer.copyValueSafe(((Number) index).intValue(), i);
-      }
-    }
-
-    ValueVector decoded = transfer.getTo();
-    decoded.getMutator().setValueCount(count);
-    return decoded;
-  }
-
-  private static void validateType(MinorType type) {
-    // byte arrays don't work as keys in our dictionary map - we could wrap them with something to
-    // implement equals and hashcode if we want that functionality
-    if (type == MinorType.VARBINARY || type == MinorType.LIST || type == MinorType.MAP || type == MinorType.UNION) {
-      throw new IllegalArgumentException("Dictionary encoding for complex types not implemented");
-    }
-  }
-
-  public ValueVector getIndexVector() { return indices; }
-
-  public ValueVector getDictionaryVector() { return dictionary.getDictionary(); }
-
-  public Dictionary getDictionary() { return dictionary; }
-
-  @Override
-  public MinorType getMinorType() { return indices.getMinorType(); }
-
-  @Override
-  public Field getField() { return indices.getField(); }
-
-  // note: dictionary vector is not closed, as it may be shared
-  @Override
-  public void close() { indices.close(); }
-
-  @Override
-  public void allocateNew() throws OutOfMemoryException { indices.allocateNew(); }
-
-  @Override
-  public boolean allocateNewSafe() { return indices.allocateNewSafe(); }
-
-  @Override
-  public BufferAllocator getAllocator() { return indices.getAllocator();  }
-
-  @Override
-  public void setInitialCapacity(int numRecords) { indices.setInitialCapacity(numRecords); }
-
-  @Override
-  public int getValueCapacity() { return indices.getValueCapacity(); }
-
-  @Override
-  public int getBufferSize() { return indices.getBufferSize(); }
-
-  @Override
-  public int getBufferSizeFor(int valueCount) { return indices.getBufferSizeFor(valueCount); }
-
-  @Override
-  public Iterator<ValueVector> iterator() {
-    return indices.iterator();
-  }
-
-  @Override
-  public void clear() { indices.clear(); }
-
-  @Override
-  public TransferPair getTransferPair(BufferAllocator allocator) { return indices.getTransferPair(allocator); }
-
-  @Override
-  public TransferPair getTransferPair(String ref, BufferAllocator allocator) { return indices.getTransferPair(ref, allocator); }
-
-  @Override
-  public TransferPair makeTransferPair(ValueVector target) { return indices.makeTransferPair(target); }
-
-  @Override
-  public Accessor getAccessor() { return indices.getAccessor(); }
-
-  @Override
-  public Mutator getMutator() { return indices.getMutator(); }
-
-  @Override
-  public FieldReader getReader() { return indices.getReader(); }
-
-  @Override
-  public ArrowBuf[] getBuffers(boolean clear) { return indices.getBuffers(clear); }
-}