You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/04/15 17:56:06 UTC

[5/6] drill git commit: DRILL-2695: Add Support for large in conditions through the use of the Values operator. Update JSON reader to support reading Extended JSON. Update JSON writer to support writing extended JSON data. Update JSON reader to automatic

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 6cf1ce5..2e65466 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -25,6 +25,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
@@ -75,6 +76,7 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
     options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig)writer.getStorageConfig()).connection);
 
     options.put("extension", "json");
+    options.put("extended", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_EXTENDED_TYPES)));
 
     RecordWriter recordWriter = new JsonRecordWriter();
     recordWriter.init(options);

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 91e0b21..4c44dbd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.drill.common.exceptions.DrillUserException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -33,76 +32,134 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.easy.json.JsonProcessor.ReadState;
 import org.apache.drill.exec.store.easy.json.reader.CountingJsonReader;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.complex.fn.JsonReader;
-import org.apache.drill.exec.store.easy.json.JsonProcessor.ReadState;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-
-import com.fasterxml.jackson.core.JsonParseException;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
 public class JSONRecordReader extends AbstractRecordReader {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
 
-  private OutputMutator mutator;
   private VectorContainerWriter writer;
+
+  // Data we're consuming
   private Path hadoopPath;
+  private JsonNode embeddedContent;
   private InputStream stream;
-  private DrillFileSystem fileSystem;
+  private final DrillFileSystem fileSystem;
   private JsonProcessor jsonReader;
   private int recordCount;
-  private FragmentContext fragmentContext;
+  private final FragmentContext fragmentContext;
   private OperatorContext operatorContext;
-  private boolean enableAllTextMode;
+  private final boolean enableAllTextMode;
+
+  /**
+   * Create a JSON Record Reader that uses a file based input stream.
+   * @param fragmentContext
+   * @param inputPath
+   * @param fileSystem
+   * @param columns
+   * @throws OutOfMemoryException
+   */
+  public JSONRecordReader(final FragmentContext fragmentContext, final String inputPath, final DrillFileSystem fileSystem,
+      final List<SchemaPath> columns) throws OutOfMemoryException {
+    this(fragmentContext, inputPath, null, fileSystem, columns);
+  }
+
+  /**
+   * Create a new JSON Record Reader that uses a in memory materialized JSON stream.
+   * @param fragmentContext
+   * @param inputPath
+   * @param fileSystem
+   * @param columns
+   * @throws OutOfMemoryException
+   */
+  public JSONRecordReader(final FragmentContext fragmentContext, final JsonNode embeddedContent, final DrillFileSystem fileSystem,
+      final List<SchemaPath> columns) throws OutOfMemoryException {
+    this(fragmentContext, null, embeddedContent, fileSystem, columns);
+  }
+
+  private JSONRecordReader(final FragmentContext fragmentContext, final String inputPath, final JsonNode embeddedContent, final DrillFileSystem fileSystem,
+                          final List<SchemaPath> columns) throws OutOfMemoryException {
+
+    Preconditions.checkArgument(
+        (inputPath == null && embeddedContent != null) ||
+        (inputPath != null && embeddedContent == null),
+        "One of inputPath or embeddedContent must be set but not both."
+        );
+
+    if(inputPath != null){
+      this.hadoopPath = new Path(inputPath);
+    }else{
+      this.embeddedContent = embeddedContent;
+    }
 
-  public JSONRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
-                          List<SchemaPath> columns) throws OutOfMemoryException {
-    this.hadoopPath = new Path(inputPath);
     this.fileSystem = fileSystem;
     this.fragmentContext = fragmentContext;
-    this.enableAllTextMode = fragmentContext.getOptions().getOption(ExecConstants.JSON_ALL_TEXT_MODE).bool_val;
+
+    // only enable all text mode if we aren't using embedded content mode.
+    this.enableAllTextMode = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR);
     setColumns(columns);
   }
 
   @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
+  public void setup(final OutputMutator output) throws ExecutionSetupException {
     try{
-      CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
-      CompressionCodec codec = factory.getCodec(hadoopPath); // infers from file ext.
-      if (codec != null) {
-        this.stream = codec.createInputStream(fileSystem.open(hadoopPath));
-      } else {
-        this.stream = fileSystem.open(hadoopPath);
-      }
+      setupData();
       this.writer = new VectorContainerWriter(output);
-      this.mutator = output;
       if (isSkipQuery()) {
         this.jsonReader = new CountingJsonReader(fragmentContext.getManagedBuffer());
       } else {
-        this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), ImmutableList.copyOf(getColumns()), enableAllTextMode);
+        this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), ImmutableList.copyOf(getColumns()), enableAllTextMode, true);
       }
-      this.jsonReader.setSource(stream);
-    }catch(Exception e){
+      setupParser();
+    }catch(final Exception e){
       handleAndRaise("Failure reading JSON file.", e);
     }
   }
 
-  protected void handleAndRaise(String suffix, Exception e) throws DrillUserException {
+  private void setupData() throws IOException{
+    if(hadoopPath != null){
+      final CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
+      final CompressionCodec codec = factory.getCodec(hadoopPath); // infers from file ext.
+      if (codec != null) {
+        this.stream = codec.createInputStream(fileSystem.open(hadoopPath));
+      } else {
+        this.stream = fileSystem.open(hadoopPath);
+      }
+    }
+  }
+
+  private void setupParser() throws IOException{
+    if(hadoopPath != null){
+      jsonReader.setSource(stream);
+    }else{
+      jsonReader.setSource(embeddedContent);
+    }
+  }
+
+  protected void handleAndRaise(final String suffix, final Exception e) throws DrillUserException {
 
     String message = e.getMessage();
     int columnNr = -1;
 
     if (e instanceof JsonParseException) {
-      JsonParseException ex = (JsonParseException) e;
+      final JsonParseException ex = (JsonParseException) e;
       message = ex.getOriginalMessage();
       columnNr = ex.getLocation().getColumnNr();
     }
 
-    DrillUserException.Builder builder = new DrillUserException.Builder(ErrorType.DATA_READ, e, "%s - %s", suffix, message);
+    final DrillUserException.Builder builder = new DrillUserException.Builder(ErrorType.DATA_READ, e, "%s - %s", suffix, message);
 
     // add context information
     builder.add("Filename: " + hadoopPath.toUri().getPath());
@@ -119,7 +176,7 @@ public class JSONRecordReader extends AbstractRecordReader {
     return operatorContext;
   }
 
-  public void setOperatorContext(OperatorContext operatorContext) {
+  public void setOperatorContext(final OperatorContext operatorContext) {
     this.operatorContext = operatorContext;
   }
 
@@ -151,15 +208,13 @@ public class JSONRecordReader extends AbstractRecordReader {
       writer.setValueCount(recordCount);
 //      p.stop();
 //      System.out.println(String.format("Wrote %d records in %dms.", recordCount, p.elapsed(TimeUnit.MILLISECONDS)));
-      if (recordCount == 0 && write == ReadState.WRITE_FAILURE) {
-        throw new IOException("Record was too large to copy into vector.");
-      }
+
 
       return recordCount;
 
-    } catch (JsonParseException e) {
+    } catch (final JsonParseException e) {
       handleAndRaise("Error parsing JSON.", e);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       handleAndRaise("Error reading JSON.", e);
     }
     // this is never reached
@@ -169,8 +224,10 @@ public class JSONRecordReader extends AbstractRecordReader {
   @Override
   public void cleanup() {
     try {
-      stream.close();
-    } catch (IOException e) {
+      if(stream != null){
+        stream.close();
+      }
+    } catch (final IOException e) {
       logger.warn("Failure while closing stream.", e);
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
index ce6017b..b310818 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
@@ -22,10 +22,11 @@ import java.io.InputStream;
 
 import org.apache.drill.exec.vector.complex.writer.BaseWriter;
 
+import com.fasterxml.jackson.databind.JsonNode;
+
 public interface JsonProcessor {
 
   public static enum ReadState {
-    WRITE_FAILURE,
     END_OF_STREAM,
     WRITE_SUCCEED
   }
@@ -33,6 +34,7 @@ public interface JsonProcessor {
   ReadState write(BaseWriter.ComplexWriter writer) throws IOException;
 
   void setSource(InputStream is) throws IOException;
+  void setSource(JsonNode node);
 
   void ensureAtLeastOneField(BaseWriter.ComplexWriter writer);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
index 76c4ace..a43a4a0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
@@ -26,6 +26,8 @@ import org.apache.drill.exec.store.EventBasedRecordWriter;
 import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
 import org.apache.drill.exec.store.JSONOutputRecordWriter;
 import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.vector.complex.fn.BasicJsonOutput;
+import org.apache.drill.exec.vector.complex.fn.ExtendedJsonOutput;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -33,17 +35,19 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
 import com.google.common.collect.Lists;
 
 public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWriter {
 
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonRecordWriter.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonRecordWriter.class);
 
   private String location;
   private String prefix;
 
   private String fieldDelimiter;
   private String extension;
+  private boolean useExtendedOutput;
 
   private int index;
   private FileSystem fs = null;
@@ -63,6 +67,7 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
     this.prefix = writerOptions.get("prefix");
     this.fieldDelimiter = writerOptions.get("separator");
     this.extension = writerOptions.get("extension");
+    this.useExtendedOutput = Boolean.parseBoolean(writerOptions.get("extended"));
 
     Configuration conf = new Configuration();
     conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
@@ -71,7 +76,12 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
     Path fileName = new Path(location, prefix + "_" + index + "." + extension);
     try {
       stream = fs.create(fileName);
-      gen = factory.createGenerator(stream).useDefaultPrettyPrinter();
+      JsonGenerator generator = factory.createGenerator(stream).useDefaultPrettyPrinter();
+      if(useExtendedOutput){
+        gen = new ExtendedJsonOutput(generator);
+      }else{
+        gen = new BasicJsonOutput(generator);
+      }
       logger.debug("Created file: {}", fileName);
     } catch (IOException ex) {
       logger.error("Unable to create file: " + fileName, ex);

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/RewindableUtf8Reader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/RewindableUtf8Reader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/RewindableUtf8Reader.java
deleted file mode 100644
index b9075de..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/RewindableUtf8Reader.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.easy.json;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.hadoop.fs.Seekable;
-
-import com.fasterxml.jackson.core.JsonStreamContextExposer;
-import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.core.JsonParser.Feature;
-import com.fasterxml.jackson.core.io.IOContext;
-import com.fasterxml.jackson.core.json.JsonReadContext;
-import com.fasterxml.jackson.core.json.JsonReadContextExposer;
-import com.fasterxml.jackson.core.json.UTF8StreamJsonParser;
-import com.fasterxml.jackson.core.sym.BytesToNameCanonicalizer;
-
-/**
- * An extended version of Jaskon's UTF8StreamJsonParser that supports rewind the stream to the previous record.
- */
-public class RewindableUtf8Reader<T extends InputStream & Seekable> extends UTF8StreamJsonParser {
-
-  private T in;
-
-  /**
-   * Index of character after last available one in the buffer.
-   */
-  private long markFilePos;
-  private int markInputPtr;
-  private int markInputEnd;
-  private long markInputProcessed;
-  private int markInputRow;
-  private int markInputRowStart;
-  private long markInputTotal;
-  private int markTokenInputRow;
-  private int markTokenInputCol;
-  private JsonToken markToken;
-  private JsonToken markLastToken;
-  private JsonReadContext markContext;
-  private JsonReadContext rootContext;
-
-  private int type;
-  private int lineNr;
-  private int colNr;
-
-  private boolean closed = false;
-
-  public RewindableUtf8Reader(IOContext ctxt, int features, BytesToNameCanonicalizer sym, byte[] inputBuffer) {
-    super(ctxt, features, null, null, sym, inputBuffer, 0, 0, true);
-    this.rootContext = this._parsingContext;
-  }
-
-  public void mark() throws IOException{
-    this.markFilePos = this.in.getPos();
-    this.markInputPtr = this._inputPtr;
-    this.markInputEnd = this._inputEnd;
-    this.markInputProcessed = this._currInputProcessed;
-    this.markInputRow = this._currInputRow;
-    this.markInputRowStart = this._currInputRowStart;
-    this.markInputTotal = this._tokenInputTotal;
-    this.markTokenInputCol = this._tokenInputCol;
-    this.markTokenInputRow = this._tokenInputRow;
-    this.markToken = this._currToken;
-    this.markLastToken = this._lastClearedToken;
-    this.markContext = this._parsingContext;
-    this.type = JsonStreamContextExposer.getType(markContext);
-    this.lineNr = JsonReadContextExposer.getLineNmbr(markContext);
-    this.colNr = JsonReadContextExposer.getColNmbr(markContext);
-  }
-
-  public void resetToMark() throws IOException{
-    if(markFilePos != in.getPos()){
-      in.seek(markFilePos - _inputBuffer.length);
-      in.read(_inputBuffer, 0, _inputBuffer.length);
-    }
-    this._inputPtr = this.markInputPtr;
-    this._inputEnd = this.markInputEnd;
-    this._currInputProcessed = this.markInputProcessed;
-    this._currInputRow = this.markInputRow;
-    this._currInputRowStart = this.markInputRowStart;
-    this._tokenInputTotal = this.markInputTotal;
-    this._tokenInputCol = this.markTokenInputCol;
-    this._tokenInputRow = this.markTokenInputRow;
-    this._currToken = this.markToken;
-    this._lastClearedToken = this.markLastToken;
-    this._parsingContext = this.markContext;
-    JsonReadContextExposer.reset(markContext, type, lineNr, colNr);
-
-  }
-
-  @Override
-  protected void _closeInput() throws IOException {
-    super._closeInput();
-
-      if (_inputStream != null) {
-          if (_ioContext.isResourceManaged() || isEnabled(Feature.AUTO_CLOSE_SOURCE)) {
-              _inputStream.close();
-          }
-          _inputStream = null;
-      }
-      this.closed = true;
-
-  }
-
-  public void setInputStream(T in) throws IOException{
-    if(this.in != null){
-      in.close();
-    }
-
-    this._inputStream = in;
-    this.in = in;
-    this._parsingContext = rootContext;
-    this._inputPtr = 0;
-    this._inputEnd = 0;
-    this._currInputProcessed = 0;
-    this._currInputRow = 0;
-    this._currInputRowStart = 0;
-    this._tokenInputTotal = 0;
-    this._tokenInputCol = 0;
-    this._tokenInputRow = 0;
-    this._currToken = null;
-    this._lastClearedToken = null;
-    this.closed = false;
-  }
-
-  public boolean hasDataAvailable() throws IOException{
-    return !closed;
-  }
-
-  @Override
-  public String toString() {
-    return "RewindableUtf8Reader [markFilePos=" + markFilePos + ", markInputPtr=" + markInputPtr + ", markInputEnd="
-        + markInputEnd + ", markInputProcessed=" + markInputProcessed + ", markInputRow=" + markInputRow
-        + ", markInputRowStart=" + markInputRowStart + ", markInputTotal=" + markInputTotal + ", markTokenInputRow="
-        + markTokenInputRow + ", markTokenInputCol=" + markTokenInputCol + ", markToken=" + markToken
-        + ", markLastToken=" + markLastToken + ", markContext=" + markContext + ", rootContext=" + rootContext
-        + ", type=" + type + ", lineNr=" + lineNr + ", colNr=" + colNr + "]";
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
index 509798a..718bb09 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
@@ -17,42 +17,40 @@
  */
 package org.apache.drill.exec.store.easy.json.reader;
 
+import io.netty.buffer.DrillBuf;
+
 import java.io.IOException;
 import java.io.InputStream;
 
-import com.fasterxml.jackson.core.JsonFactory;
+import org.apache.drill.exec.store.easy.json.JsonProcessor;
+
 import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.io.IOContext;
-import com.fasterxml.jackson.core.sym.BytesToNameCanonicalizer;
-import com.fasterxml.jackson.core.util.BufferRecycler;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.TreeTraversingParser;
 import com.google.common.base.Preconditions;
-import io.netty.buffer.DrillBuf;
-import org.apache.drill.exec.store.easy.json.JsonProcessor;
-import org.apache.drill.exec.store.easy.json.RewindableUtf8Reader;
 
 public abstract class BaseJsonProcessor implements JsonProcessor {
 
-  protected final RewindableUtf8Reader parser;
+  private static final ObjectMapper MAPPER = new ObjectMapper() //
+    .configure(JsonParser.Feature.ALLOW_COMMENTS, true)
+    .configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
+
+  protected JsonParser parser;
   protected DrillBuf workBuf;
 
   public BaseJsonProcessor(DrillBuf workBuf) {
     this.workBuf = Preconditions.checkNotNull(workBuf);
-    this.parser = Preconditions.checkNotNull(createParser());
   }
 
-  protected RewindableUtf8Reader createParser() {
-    final BufferRecycler recycler = new BufferRecycler();
-    final IOContext context = new IOContext(recycler, this, false);
-    final int features = JsonParser.Feature.collectDefaults() //
-        | JsonParser.Feature.ALLOW_COMMENTS.getMask() //
-        | JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES.getMask();
-
-    final BytesToNameCanonicalizer can = BytesToNameCanonicalizer.createRoot();
-    return new RewindableUtf8Reader<>(context, features, can.makeChild(JsonFactory.Feature.collectDefaults()), context.allocReadIOBuffer());
+  @Override
+  public void setSource(InputStream is) throws IOException {
+    this.parser = MAPPER.getFactory().createParser(is);
   }
 
   @Override
-  public void setSource(InputStream is) throws IOException {
-    parser.setInputStream(is);
+  public void setSource(JsonNode node) {
+    this.parser = new TreeTraversingParser(node);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
index 1ef71e7..c4ab1ee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
@@ -24,7 +24,7 @@ import io.netty.buffer.DrillBuf;
 import org.apache.drill.exec.store.easy.json.JsonProcessor;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter;
 
-public class CountingJsonReader extends BaseJsonProcessor implements JsonProcessor {
+public class CountingJsonReader extends BaseJsonProcessor {
 
   public CountingJsonReader(DrillBuf workBuf) {
     super(workBuf);
@@ -40,7 +40,7 @@ public class CountingJsonReader extends BaseJsonProcessor implements JsonProcess
     }
     writer.rootAsMap().bit("count").writeBit(1);
     parser.skipChildren();
-    return writer.ok() ? ReadState.WRITE_SUCCEED : ReadState.WRITE_FAILURE;
+    return ReadState.WRITE_SUCCEED;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 9f8357b..4d837c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -316,15 +316,6 @@ public class DrillParquetReader extends AbstractRecordReader {
       recordReader.read();
       count++;
       totalRead++;
-      if (count % fillLevelCheckFrequency == 0) {
-        if (getPercentFilled() > fillLevelCheckThreshold) {
-          if(!recordMaterializer.ok()){
-            String message = String.format("The setting for `%s` is too high for your Parquet records. Please set a lower check threshold and retry your query. ", ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD);
-            handleAndRaise(message, new RuntimeException(message));
-          }
-          break;
-        }
-      }
     }
     writer.setValueCount(count);
     // if we have requested columns that were not found in the file fill their vectors with null

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
index 720e8be..574df40 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
@@ -42,10 +42,6 @@ public class DrillParquetRecordMaterializer extends RecordMaterializer<Void> {
     complexWriter.setPosition(position);
   }
 
-  public boolean ok() {
-    return complexWriter.ok();
-  }
-
   @Override
   public Void getCurrentRecord() {
     return null;

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java
index 2c2ff54..df4279a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.vector;
 
-public interface RepeatedVector {
+public interface RepeatedVector extends ValueVector {
   public static final int DEFAULT_REPEAT_PER_RECORD = 4;
 
   public RepeatedFixedWidthVector.RepeatedAccessor getAccessor();

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/WriteState.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/WriteState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/WriteState.java
deleted file mode 100644
index 43dba65..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/WriteState.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.vector.complex;
-
-import org.apache.drill.exec.vector.complex.writer.FieldWriter;
-
-
-public class WriteState {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WriteState.class);
-
-  private FieldWriter failPoint;
-
-  public boolean isFailed(){
-    return failPoint != null;
-  }
-
-  public boolean isOk(){
-    return failPoint == null;
-  }
-
-  public void fail(FieldWriter w){
-    assert failPoint == null;
-    failPoint = w;
-
-//    System.out.println("Fail Point " + failPoint);
-  }
-
-  public void reset(){
-    failPoint = null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/BasicJsonOutput.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/BasicJsonOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/BasicJsonOutput.java
new file mode 100644
index 0000000..364692e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/BasicJsonOutput.java
@@ -0,0 +1,530 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.fn;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.DateUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Period;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import org.joda.time.format.ISOPeriodFormat;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Preconditions;
+
+/**
+ * A JSON output class that generates standard JSON. By default, literals are output such that they can be implicitly
+ * cast.
+ */
+public class BasicJsonOutput implements JsonOutput {
+
+  protected final JsonGenerator gen;
+  private final DateTimeFormatter dateFormatter;
+  private final DateTimeFormatter timeFormatter;
+  private final DateTimeFormatter timestampFormatter;
+
+  public BasicJsonOutput(JsonGenerator gen) {
+    this(gen, DateOutputFormat.SQL);
+  }
+
+  protected BasicJsonOutput(JsonGenerator gen, DateOutputFormat dateOutput) {
+    Preconditions.checkNotNull(dateOutput);
+    Preconditions.checkNotNull(gen);
+
+    this.gen = gen;
+
+    switch (dateOutput) {
+    case SQL: {
+      dateFormatter = DateUtility.formatDate;
+      timeFormatter = DateUtility.formatTime;
+      timestampFormatter = DateUtility.formatTimeStamp;
+      break;
+    }
+    case ISO: {
+      dateFormatter = ISODateTimeFormat.date();
+      timeFormatter = ISODateTimeFormat.time();
+      timestampFormatter = ISODateTimeFormat.dateTime();
+      break;
+    }
+
+    default:
+      throw new UnsupportedOperationException(String.format("Unable to support date output of type %s.", dateOutput));
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    gen.flush();
+  }
+
+  @Override
+  public void writeStartArray() throws IOException {
+    gen.writeStartArray();
+  }
+
+  @Override
+  public void writeEndArray() throws IOException {
+    gen.writeEndArray();
+  }
+
+  @Override
+  public void writeStartObject() throws IOException {
+    gen.writeStartObject();
+  }
+
+  @Override
+  public void writeEndObject() throws IOException {
+    gen.writeEndObject();
+  }
+
+  @Override
+  public void writeUntypedNull() throws IOException {
+    gen.writeNull();
+  }
+
+  @Override
+  public void writeFieldName(String name) throws IOException {
+    gen.writeFieldName(name);
+  }
+
+  @Override
+  public void writeDecimal(FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeDecimal(reader.readBigDecimal());
+    } else {
+      writeDecimalNull();
+    }
+  }
+
+  @Override
+  public void writeTinyInt(FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeTinyInt(reader.readByte());
+    } else {
+      writeTinyIntNull();
+    }
+  }
+
+  @Override
+  public void writeSmallInt(FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeSmallInt(reader.readShort());
+    } else {
+      writeSmallIntNull();
+    }
+  }
+
+  @Override
+  public void writeInt(FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeInt(reader.readInteger());
+    } else {
+      writeIntNull();
+    }
+  }
+
+  @Override
+  public void writeBigInt(FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeBigInt(reader.readLong());
+    } else {
+      writeBigIntNull();
+    }
+  }
+
+  @Override
+  public void writeFloat(FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeFloat(reader.readFloat());
+    } else {
+      writeFloatNull();
+    }
+  }
+
+  @Override
+  public void writeDouble(FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeDouble(reader.readDouble());
+    } else {
+      writeDoubleNull();
+    }
+  }
+
+  @Override
+  public void writeVarChar(FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeVarChar(reader.readText().toString());
+    } else {
+      writeVarcharNull();
+    }
+  }
+
+  @Override
+  public void writeVar16Char(FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeVar16Char(reader.readText().toString());
+    } else {
+      writeVar16charNull();
+    }
+  }
+
+  @Override
+  public void writeBinary(FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeBinary(reader.readByteArray());
+    } else {
+      writeBinaryNull();
+    }
+  }
+
+  @Override
+  public void writeBoolean(FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeBoolean(reader.readBoolean());
+    } else {
+      writeBooleanNull();
+    }
+  }
+
+  @Override
+  public void writeDate(FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeDate(reader.readDateTime());
+    } else {
+      writeDateNull();
+    }
+  }
+
+  @Override
+  public void writeTime(FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeTime(reader.readDateTime());
+    } else {
+      writeTimeNull();
+    }
+  }
+
+  @Override
+  public void writeTimestamp(FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeTimestamp(reader.readDateTime());
+    } else {
+      writeTimeNull();
+    }
+  }
+
+  @Override
+  public void writeInterval(FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeInterval(reader.readPeriod());
+    } else {
+      writeIntervalNull();
+    }
+  }
+
+  @Override
+  public void writeTinyInt(int index, FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeTinyInt(reader.readByte(index));
+    } else {
+      writeTinyIntNull();
+    }
+  }
+
+  @Override
+  public void writeSmallInt(int index, FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeSmallInt(reader.readShort(index));
+    } else {
+      writeSmallIntNull();
+    }
+  }
+
+  @Override
+  public void writeInt(int index, FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeInt(reader.readInteger(index));
+    } else {
+      writeIntNull();
+    }
+  }
+
+  @Override
+  public void writeBigInt(int index, FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeBigInt(reader.readLong(index));
+    } else {
+      writeBigIntNull();
+    }
+  }
+
+  @Override
+  public void writeFloat(int index, FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeFloat(reader.readFloat(index));
+    } else {
+      writeFloatNull();
+    }
+  }
+
+  @Override
+  public void writeDouble(int index, FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeDouble(reader.readDouble(index));
+    } else {
+      writeDoubleNull();
+    }
+  }
+
+  @Override
+  public void writeVarChar(int index, FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeVarChar(reader.readText(index).toString());
+    } else {
+      writeVarcharNull();
+    }
+  }
+
+  @Override
+  public void writeVar16Char(int index, FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeVar16Char(reader.readString(index));
+    } else {
+      writeVar16charNull();
+    }
+  }
+
+  @Override
+  public void writeBinary(int index, FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeBinary(reader.readByteArray(index));
+    } else {
+      writeBinaryNull();
+    }
+  }
+
+  @Override
+  public void writeBoolean(int index, FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeBoolean(reader.readBoolean(index));
+    } else {
+      writeBooleanNull();
+    }
+  }
+
+  @Override
+  public void writeDate(int index, FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeDate(reader.readDateTime(index));
+    } else {
+      writeDateNull();
+    }
+  }
+
+  @Override
+  public void writeTime(int index, FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeTime(reader.readDateTime(index));
+    } else {
+      writeTimeNull();
+    }
+  }
+
+  @Override
+  public void writeTimestamp(int index, FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeTimestamp(reader.readDateTime(index));
+    } else {
+      writeTimestampNull();
+    }
+  }
+
+  @Override
+  public void writeInterval(int index, FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeInterval(reader.readPeriod(index));
+    } else {
+      writeIntervalNull();
+    }
+  }
+
+  @Override
+  public void writeDecimal(int index, FieldReader reader) throws IOException {
+    if (reader.isSet()) {
+      writeDecimal(reader.readBigDecimal(index));
+    } else {
+      writeDecimalNull();
+    }
+  }
+
+  @Override
+  public void writeDecimal(BigDecimal value) throws IOException {
+    gen.writeNumber(value);
+  }
+
+  @Override
+  public void writeTinyInt(byte value) throws IOException {
+    gen.writeNumber(value);
+  }
+
+  @Override
+  public void writeSmallInt(short value) throws IOException {
+    gen.writeNumber(value);
+  }
+
+  @Override
+  public void writeInt(int value) throws IOException {
+    gen.writeNumber(value);
+  }
+
+  @Override
+  public void writeBigInt(long value) throws IOException {
+    gen.writeNumber(value);
+  }
+
+  @Override
+  public void writeFloat(float value) throws IOException {
+    gen.writeNumber(value);
+  }
+
+  @Override
+  public void writeDouble(double value) throws IOException {
+    gen.writeNumber(value);
+  }
+
+  @Override
+  public void writeVarChar(String value) throws IOException {
+    gen.writeString(value);
+  }
+
+  @Override
+  public void writeVar16Char(String value) throws IOException {
+    gen.writeString(value);
+  }
+
+  @Override
+  public void writeBinary(byte[] value) throws IOException {
+    gen.writeBinary(value);
+  }
+
+  @Override
+  public void writeBoolean(boolean value) throws IOException {
+    gen.writeBoolean(value);
+  }
+
+  @Override
+  public void writeDate(DateTime value) throws IOException {
+    gen.writeString(dateFormatter.print(value.withZone(DateTimeZone.UTC)));
+  }
+
+  @Override
+  public void writeTime(DateTime value) throws IOException {
+    gen.writeString(timeFormatter.print(value.withZone(DateTimeZone.UTC)));
+  }
+
+  @Override
+  public void writeTimestamp(DateTime value) throws IOException {
+    gen.writeString(timestampFormatter.print(value.withZone(DateTimeZone.UTC)));
+  }
+
+  @Override
+  public void writeInterval(Period value) throws IOException {
+    gen.writeString(value.toString(ISOPeriodFormat.standard()));
+  }
+
+  @Override
+  public void writeDecimalNull() throws IOException {
+    gen.writeNull();
+  }
+
+  @Override
+  public void writeTinyIntNull() throws IOException {
+    gen.writeNull();
+  }
+
+  @Override
+  public void writeSmallIntNull() throws IOException {
+    gen.writeNull();
+  }
+
+  @Override
+  public void writeIntNull() throws IOException {
+    gen.writeNull();
+  }
+
+  @Override
+  public void writeBigIntNull() throws IOException {
+    gen.writeNull();
+  }
+
+  @Override
+  public void writeFloatNull() throws IOException {
+    gen.writeNull();
+  }
+
+  @Override
+  public void writeDoubleNull() throws IOException {
+    gen.writeNull();
+  }
+
+  @Override
+  public void writeVarcharNull() throws IOException {
+    gen.writeNull();
+  }
+
+  @Override
+  public void writeVar16charNull() throws IOException {
+    gen.writeNull();
+  }
+
+  @Override
+  public void writeBinaryNull() throws IOException {
+    gen.writeNull();
+  }
+
+  @Override
+  public void writeBooleanNull() throws IOException {
+    gen.writeNull();
+  }
+
+  @Override
+  public void writeDateNull() throws IOException {
+    gen.writeNull();
+  }
+
+  @Override
+  public void writeTimeNull() throws IOException {
+    gen.writeNull();
+  }
+
+  @Override
+  public void writeTimestampNull() throws IOException {
+    gen.writeNull();
+  }
+
+  @Override
+  public void writeIntervalNull() throws IOException {
+    gen.writeNull();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/DateOutputFormat.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/DateOutputFormat.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/DateOutputFormat.java
new file mode 100644
index 0000000..fa91b75
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/DateOutputFormat.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.fn;
+
+/**
+ * Describes the default date output format to use for JSON. Drill's default behavior for text output formats is to use
+ * a string which can be implicitly casted back to its original type (so the same format as the SQL literal format where
+ * applicable). However, in JSON, we also can use extended types to specifically identify the data type of the output.
+ * In this case, Drill outputs ISO standard formats rather than SQL formats to ensure comaptibility with other systems
+ * (namely MongoDB).
+ */
+public enum DateOutputFormat {
+  /**
+   * The SQL literal format for dates.  This means no timezone in times and a space in between the date and time for timestamp.
+   */
+  SQL,
+
+  /**
+   * The ISO standard format for dates/times.
+   */
+  ISO
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ExtendedJsonOutput.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ExtendedJsonOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ExtendedJsonOutput.java
new file mode 100644
index 0000000..9ac1dd5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ExtendedJsonOutput.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.fn;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+/**
+ * Writes JSON Output that will wrap Binary, Date, Time, Timestamp, Integer, Decimal and Interval types with wrapping
+ * maps for better type resolution upon deserialization.
+ */
+public class ExtendedJsonOutput extends BasicJsonOutput {
+
+  public ExtendedJsonOutput(JsonGenerator gen) {
+    super(gen, DateOutputFormat.ISO);
+  }
+
+  @Override
+  public void writeBigInt(long value) throws IOException {
+    gen.writeStartObject();
+    gen.writeFieldName(ExtendedType.INTEGER.serialized);
+    super.writeBigInt(value);
+    gen.writeEndObject();
+  }
+
+  @Override
+  public void writeBinary(byte[] value) throws IOException {
+    gen.writeStartObject();
+    gen.writeFieldName(ExtendedType.BINARY.serialized);
+    super.writeBinary(value);
+    gen.writeEndObject();
+  }
+
+  @Override
+  public void writeDate(DateTime value) throws IOException {
+    gen.writeStartObject();
+    gen.writeFieldName(ExtendedType.DATE.serialized);
+    super.writeDate(value);
+    gen.writeEndObject();
+  }
+
+  @Override
+  public void writeTime(DateTime value) throws IOException {
+    gen.writeStartObject();
+    gen.writeFieldName(ExtendedType.TIME.serialized);
+    super.writeTime(value);
+    gen.writeEndObject();
+  }
+
+  @Override
+  public void writeTimestamp(DateTime value) throws IOException {
+    gen.writeStartObject();
+    gen.writeFieldName(ExtendedType.TIMESTAMP.serialized);
+    super.writeTimestamp(value);
+    gen.writeEndObject();
+  }
+
+  @Override
+  public void writeInterval(Period value) throws IOException {
+    gen.writeStartObject();
+    gen.writeFieldName(ExtendedType.INTERVAL.serialized);
+    super.writeInterval(value);
+    gen.writeEndObject();
+  }
+
+  @Override
+  public void writeBigIntNull() throws IOException {
+    gen.writeStartObject();
+    gen.writeFieldName(ExtendedType.INTEGER.serialized);
+    super.writeBigIntNull();
+    gen.writeEndObject();
+  }
+
+  @Override
+  public void writeBinaryNull() throws IOException {
+    gen.writeStartObject();
+    gen.writeFieldName(ExtendedType.BINARY.serialized);
+    super.writeBinaryNull();
+    gen.writeEndObject();
+  }
+
+  @Override
+  public void writeDateNull() throws IOException {
+    gen.writeStartObject();
+    gen.writeFieldName(ExtendedType.DATE.serialized);
+    super.writeDateNull();
+    gen.writeEndObject();
+  }
+
+  @Override
+  public void writeTimeNull() throws IOException {
+    gen.writeStartObject();
+    gen.writeFieldName(ExtendedType.TIME.serialized);
+    super.writeTimeNull();
+    gen.writeEndObject();
+  }
+
+  @Override
+  public void writeTimestampNull() throws IOException {
+    gen.writeStartObject();
+    gen.writeFieldName(ExtendedType.TIMESTAMP.serialized);
+    super.writeTimestampNull();
+    gen.writeEndObject();
+  }
+
+  @Override
+  public void writeIntervalNull() throws IOException {
+    gen.writeStartObject();
+    gen.writeFieldName(ExtendedType.INTERVAL.serialized);
+    super.writeIntervalNull();
+    gen.writeEndObject();
+  }
+
+  @Override
+  public void writeDecimal(BigDecimal value) throws IOException {
+    gen.writeStartObject();
+    gen.writeFieldName(ExtendedType.DECIMAL.serialized);
+    super.writeDecimal(value);
+    gen.writeEndObject();
+  }
+
+  @Override
+  public void writeDecimalNull() throws IOException {
+    gen.writeStartObject();
+    gen.writeFieldName(ExtendedType.DECIMAL.serialized);
+    super.writeDecimalNull();
+    gen.writeEndObject();
+  }
+
+  @Override
+  public void writeTinyInt(byte value) throws IOException {
+    writeBigInt(value);
+  }
+
+  @Override
+  public void writeSmallInt(short value) throws IOException {
+    writeBigInt(value);
+  }
+
+  @Override
+  public void writeInt(int value) throws IOException {
+    writeBigInt(value);
+  }
+
+  @Override
+  public void writeTinyIntNull() throws IOException {
+    writeBigIntNull();
+  }
+
+  @Override
+  public void writeSmallIntNull() throws IOException {
+    writeBigIntNull();
+  }
+
+  @Override
+  public void writeIntNull() throws IOException {
+    writeBigIntNull();
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ExtendedType.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ExtendedType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ExtendedType.java
new file mode 100644
index 0000000..bec0fd2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ExtendedType.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.fn;
+
+import com.fasterxml.jackson.core.io.SerializedString;
+
+public enum ExtendedType {
+
+  BINARY(ExtendedTypeName.BINARY),
+  DATE(ExtendedTypeName.DATE),
+  TIME(ExtendedTypeName.TIME),
+  TIMESTAMP(ExtendedTypeName.TIMESTAMP),
+  INTERVAL(ExtendedTypeName.INTERVAL),
+  INTEGER(ExtendedTypeName.INTEGER),
+  DECIMAL(ExtendedTypeName.DECIMAL);
+
+  public final SerializedString serialized;
+
+  ExtendedType(String name) {
+    this.serialized = new SerializedString(name);
+  }
+
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ExtendedTypeName.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ExtendedTypeName.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ExtendedTypeName.java
new file mode 100644
index 0000000..fcef24b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ExtendedTypeName.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.fn;
+
+public interface ExtendedTypeName {
+  public static final String BINARY = "$binary";      // base64 encoded binary (ZHJpbGw=)  [from Mongo]
+  public static final String DATE = "$dateDay";       // ISO date with no time. such as (12-24-27)
+  public static final String TIME = "$time";          // ISO time with no timezone (19:20:30.45Z)
+  public static final String TIMESTAMP = "$date";     // ISO standard time (2009-02-23T00:00:00.000-08:00) [from Mongo]
+  public static final String INTERVAL = "$interval";  // ISO standard duration (PT26.4S)
+  public static final String INTEGER = "$numberLong"; // 8 byte signed integer (123) [from Mongo]
+  public static final String DECIMAL = "$decimal";    // exact numeric value (123.123)
+}
+

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonOutput.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonOutput.java
new file mode 100644
index 0000000..72720de
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonOutput.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.fn;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+
+/**
+ * Interface through which UDFs, RecordWriters and other systems can write out a JSON output.
+ * Generally used to control how non-json types are mapped to a json output stream.
+ */
+public interface JsonOutput {
+
+  // basic json tools.
+  void flush() throws IOException;
+  void writeStartArray() throws IOException;
+  void writeEndArray() throws IOException;
+  void writeStartObject() throws IOException;
+  void writeEndObject() throws IOException;
+  void writeUntypedNull() throws IOException;
+  void writeFieldName(String name) throws IOException;
+
+
+  // literals
+  void writeDecimal(BigDecimal value) throws IOException;
+  void writeTinyInt(byte value) throws IOException;
+  void writeSmallInt(short value) throws IOException;
+  void writeInt(int value) throws IOException;
+  void writeBigInt(long value) throws IOException;
+  void writeFloat(float value) throws IOException;
+  void writeDouble(double value) throws IOException;
+  void writeVarChar(String value) throws IOException;
+  void writeVar16Char(String value) throws IOException;
+  void writeBinary(byte[] value) throws IOException;
+  void writeBoolean(boolean value) throws IOException;
+  void writeDate(DateTime value) throws IOException;
+  void writeTime(DateTime value) throws IOException;
+  void writeTimestamp(DateTime value) throws IOException;
+  void writeInterval(Period value) throws IOException;
+  void writeDecimalNull() throws IOException;
+  void writeTinyIntNull() throws IOException;
+  void writeSmallIntNull() throws IOException;
+  void writeIntNull() throws IOException;
+  void writeBigIntNull() throws IOException;
+  void writeFloatNull() throws IOException;
+  void writeDoubleNull() throws IOException;
+  void writeVarcharNull() throws IOException;
+  void writeVar16charNull() throws IOException;
+  void writeBinaryNull() throws IOException;
+  void writeBooleanNull() throws IOException;
+  void writeDateNull() throws IOException;
+  void writeTimeNull() throws IOException;
+  void writeTimestampNull() throws IOException;
+  void writeIntervalNull() throws IOException;
+
+
+  // scalars reader
+  void writeDecimal(FieldReader reader) throws IOException;
+  void writeTinyInt(FieldReader reader) throws IOException;
+  void writeSmallInt(FieldReader reader) throws IOException;
+  void writeInt(FieldReader reader) throws IOException;
+  void writeBigInt(FieldReader reader) throws IOException;
+  void writeFloat(FieldReader reader) throws IOException;
+  void writeDouble(FieldReader reader) throws IOException;
+  void writeVarChar(FieldReader reader) throws IOException;
+  void writeVar16Char(FieldReader reader) throws IOException;
+  void writeBinary(FieldReader reader) throws IOException;
+  void writeBoolean(FieldReader reader) throws IOException;
+  void writeDate(FieldReader reader) throws IOException;
+  void writeTime(FieldReader reader) throws IOException;
+  void writeTimestamp(FieldReader reader) throws IOException;
+  void writeInterval(FieldReader reader) throws IOException;
+
+  // index positioned scalars
+  void writeDecimal(int index, FieldReader reader) throws IOException;
+  void writeTinyInt(int index, FieldReader reader) throws IOException;
+  void writeSmallInt(int index, FieldReader reader) throws IOException;
+  void writeInt(int index, FieldReader reader) throws IOException;
+  void writeBigInt(int index, FieldReader reader) throws IOException;
+  void writeFloat(int index, FieldReader reader) throws IOException;
+  void writeDouble(int index, FieldReader reader) throws IOException;
+  void writeVarChar(int index, FieldReader reader) throws IOException;
+  void writeVar16Char(int index, FieldReader reader) throws IOException;
+  void writeBinary(int index, FieldReader reader) throws IOException;
+  void writeBoolean(int index, FieldReader reader) throws IOException;
+  void writeDate(int index, FieldReader reader) throws IOException;
+  void writeTime(int index, FieldReader reader) throws IOException;
+  void writeTimestamp(int index, FieldReader reader) throws IOException;
+  void writeInterval(int index, FieldReader reader) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 9738ff8..c196fd2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -21,82 +21,77 @@ import io.netty.buffer.DrillBuf;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.expr.holders.BigIntHolder;
-import org.apache.drill.exec.expr.holders.BitHolder;
-import org.apache.drill.exec.expr.holders.Float8Holder;
-import org.apache.drill.exec.expr.holders.VarCharHolder;
 import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.easy.json.JsonProcessor;
-import org.apache.drill.exec.store.easy.json.RewindableUtf8Reader;
 import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor;
+import org.apache.drill.exec.vector.complex.fn.VectorOutput.ListVectorOutput;
+import org.apache.drill.exec.vector.complex.fn.VectorOutput.MapVectorOutput;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.Seekable;
 
-import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonParser.Feature;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.core.io.IOContext;
-import com.fasterxml.jackson.core.sym.BytesToNameCanonicalizer;
-import com.fasterxml.jackson.core.util.BufferRecycler;
+import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.io.compress.CompressionInputStream;
 
-public class JsonReader extends BaseJsonProcessor implements JsonProcessor {
+public class JsonReader extends BaseJsonProcessor {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonReader.class);
-  public final static int MAX_RECORD_SIZE = 128*1024;
+  public final static int MAX_RECORD_SIZE = 128 * 1024;
 
+  private final WorkingBuffer workingBuffer;
   private final List<SchemaPath> columns;
   private final boolean allTextMode;
   private boolean atLeastOneWrite = false;
-
-  private FieldSelection selection;
+  private final MapVectorOutput mapOutput;
+  private final ListVectorOutput listOutput;
+  private final boolean extended = true;
 
   /**
-   * Whether we are in a reset state. In a reset state, we don't have to advance to the next token on write because
-   * we're already at the start of the next object
+   * Describes whether or not this reader can unwrap a single root array record and treat it like a set of distinct records.
    */
-  private boolean onReset = false;
+  private final boolean skipOuterList;
 
+  /**
+   * Whether the reader is currently in a situation where we are unwrapping an outer list.
+   */
+  private boolean inOuterList;
 
-  public JsonReader() throws IOException {
-    this(null, false);
-  }
+  private FieldSelection selection;
 
-  public JsonReader(DrillBuf managedBuf, boolean allTextMode) {
-    this(managedBuf, GroupScan.ALL_COLUMNS, allTextMode);
+  public JsonReader(DrillBuf managedBuf, boolean allTextMode, boolean skipOuterList) {
+    this(managedBuf, GroupScan.ALL_COLUMNS, allTextMode, skipOuterList);
   }
 
-  public JsonReader(DrillBuf managedBuf, List<SchemaPath> columns, boolean allTextMode) {
+  public JsonReader(DrillBuf managedBuf, List<SchemaPath> columns, boolean allTextMode, boolean skipOuterList) {
     super(managedBuf);
-
     assert Preconditions.checkNotNull(columns).size() > 0 : "json record reader requires at least a column";
     this.selection = FieldSelection.getFieldSelection(columns);
+    this.workingBuffer = new WorkingBuffer(managedBuf);
+    this.skipOuterList = skipOuterList;
     this.allTextMode = allTextMode;
     this.columns = columns;
+    this.mapOutput = new MapVectorOutput(workingBuffer);
+    this.listOutput = new ListVectorOutput(workingBuffer);
   }
 
   @Override
-  public void ensureAtLeastOneField(ComplexWriter writer){
-    if(!atLeastOneWrite){
+  public void ensureAtLeastOneField(ComplexWriter writer) {
+    if (!atLeastOneWrite) {
       // if we had no columns, create one empty one so we can return some data for count purposes.
       SchemaPath sp = columns.get(0);
       PathSegment root = sp.getRootSegment();
       BaseWriter.MapWriter fieldWriter = writer.rootAsMap();
-      while (root.getChild() != null && ! root.getChild().isArray()) {
+      while (root.getChild() != null && !root.getChild().isArray()) {
         fieldWriter = fieldWriter.map(root.getNameSegment().getPath());
         root = root.getChild();
       }
@@ -104,54 +99,50 @@ public class JsonReader extends BaseJsonProcessor implements JsonProcessor {
     }
   }
 
+  public void setSource(int start, int end, DrillBuf buf) throws IOException {
+    setSource(DrillBufInputStream.getStream(start, end, buf));
+  }
+
+
   @Override
-  public void setSource(InputStream is) throws IOException{
-    parser.setInputStream(is);
-    this.onReset = false;
+  public void setSource(InputStream is) throws IOException {
+    super.setSource(is);
+    mapOutput.setParser(parser);
+    listOutput.setParser(parser);
   }
 
-  public void setSource(int start, int end, DrillBuf buf) throws IOException{
-    parser.setInputStream(DrillBufInputStream.getStream(start, end, buf));
+  @Override
+  public void setSource(JsonNode node) {
+    super.setSource(node);
+    mapOutput.setParser(parser);
+    listOutput.setParser(parser);
   }
 
   public void setSource(String data) throws IOException {
     setSource(data.getBytes(Charsets.UTF_8));
   }
 
-  public void setSource(byte[] bytes) throws IOException{
-    parser.setInputStream(new SeekableBAIS(bytes));
-    this.onReset = false;
+  public void setSource(byte[] bytes) throws IOException {
+    setSource(new SeekableBAIS(bytes));
   }
 
-
   @Override
   public ReadState write(ComplexWriter writer) throws IOException {
-    JsonToken t = onReset ? parser.getCurrentToken() : parser.nextToken();
+    JsonToken t = parser.nextToken();
 
-    while (!parser.hasCurrentToken() && parser.hasDataAvailable()) {
+    while (!parser.hasCurrentToken() && !parser.isClosed()) {
       t = parser.nextToken();
     }
 
-    if(!parser.hasCurrentToken()){
+    if (parser.isClosed()) {
       return ReadState.END_OF_STREAM;
     }
 
-    if(onReset){
-      onReset = false;
-    }else{
-      parser.mark();
-    }
-
     ReadState readState = writeToVector(writer, t);
 
-    switch(readState){
+    switch (readState) {
     case END_OF_STREAM:
       break;
-    case WRITE_FAILURE:
-      logger.debug("Ran out of space while writing object, rewinding to object start.");
-      parser.resetToMark();
-      onReset = true;
-      break;
     case WRITE_SUCCEED:
       break;
     default:
@@ -161,72 +152,113 @@ public class JsonReader extends BaseJsonProcessor implements JsonProcessor {
     return readState;
   }
 
-  private ReadState writeToVector(ComplexWriter writer, JsonToken t) throws IOException {
-    if (!writer.ok()) {
-      return ReadState.WRITE_FAILURE;
+  private void confirmLast() throws IOException{
+    parser.nextToken();
+    if(!parser.isClosed()){
+      throw new JsonParseException("Drill attempted to unwrap a toplevel list "
+        + "in your document.  However, it appears that there is trailing content after this top level list.  Drill only "
+        + "supports querying a set of distinct maps or a single json array with multiple inner maps.", parser.getCurrentLocation());
     }
+  }
 
+  private ReadState writeToVector(ComplexWriter writer, JsonToken t) throws IOException {
     switch (t) {
-      case START_OBJECT:
-        writeDataSwitch(writer.rootAsMap());
-        break;
-      case START_ARRAY:
+    case START_OBJECT:
+      writeDataSwitch(writer.rootAsMap());
+      break;
+    case START_ARRAY:
+      if(inOuterList){
+        throw new JsonParseException("The top level of your document must either be a single array of maps or a set "
+            + "of white space delimited maps.", parser.getCurrentLocation());
+      }
+
+      if(skipOuterList){
+        t = parser.nextToken();
+        if(t == JsonToken.START_OBJECT){
+          inOuterList = true;
+          writeDataSwitch(writer.rootAsMap());
+        }else{
+          throw new JsonParseException("The top level of your document must either be a single array of maps or a set "
+              + "of white space delimited maps.", parser.getCurrentLocation());
+        }
+
+      }else{
         writeDataSwitch(writer.rootAsList());
-        break;
-      case NOT_AVAILABLE:
-        return ReadState.END_OF_STREAM;
-      default:
-        throw new JsonParseException(
-            String.format("Failure while parsing JSON.  Found token of [%s]  Drill currently only supports parsing "
-                + "json strings that contain either lists or maps.  The root object cannot be a scalar.",
-                t),
-            parser.getCurrentLocation());
       }
+      break;
+    case END_ARRAY:
 
-      if(writer.ok()){
-        return ReadState.WRITE_SUCCEED;
+      if(inOuterList){
+        confirmLast();
+        return ReadState.END_OF_STREAM;
       }else{
-        return ReadState.WRITE_FAILURE;
+        throw new JsonParseException(String.format("Failure while parsing JSON.  Ran across unexpected %s.", JsonToken.END_ARRAY), parser.getCurrentLocation());
       }
+
+    case NOT_AVAILABLE:
+      return ReadState.END_OF_STREAM;
+    default:
+      throw new JsonParseException(String.format(
+          "Failure while parsing JSON.  Found token of [%s]  Drill currently only supports parsing "
+              + "json strings that contain either lists or maps.  The root object cannot be a scalar.", t),
+          parser.getCurrentLocation());
+    }
+
+    return ReadState.WRITE_SUCCEED;
+
   }
 
-  private void writeDataSwitch(MapWriter w) throws IOException{
-    if(this.allTextMode){
-      writeDataAllText(w, this.selection);
-    }else{
-      writeData(w, this.selection);
+  private void writeDataSwitch(MapWriter w) throws IOException {
+    if (this.allTextMode) {
+      writeDataAllText(w, this.selection, true);
+    } else {
+      writeData(w, this.selection, true);
     }
   }
 
-  private void writeDataSwitch(ListWriter w) throws IOException{
-    if(this.allTextMode){
+  private void writeDataSwitch(ListWriter w) throws IOException {
+    if (this.allTextMode) {
       writeDataAllText(w);
-    }else{
+    } else {
       writeData(w);
     }
   }
 
   private void consumeEntireNextValue() throws IOException {
     switch (parser.nextToken()) {
-      case START_ARRAY:
-      case START_OBJECT:
-        parser.skipChildren();
-        return;
-      default:
-        // hit a single value, do nothing as the token was already read
-        // in the switch statement
-        return;
+    case START_ARRAY:
+    case START_OBJECT:
+      parser.skipChildren();
+      return;
+    default:
+      // hit a single value, do nothing as the token was already read
+      // in the switch statement
+      return;
     }
   }
 
-  private void writeData(MapWriter map, FieldSelection selection) throws IOException {
+  /**
+   *
+   * @param map
+   * @param selection
+   * @param moveForward
+   *          Whether or not we should start with using the current token or the next token. If moveForward = true, we
+   *          should start with the next token and ignore the current one.
+   * @throws IOException
+   */
+  private void writeData(MapWriter map, FieldSelection selection, boolean moveForward) throws IOException {
     //
     map.start();
-    outside: while(true) {
-      if (!map.ok()) {
-        return;
+    outside: while (true) {
+
+      JsonToken t;
+      if(moveForward){
+        t = parser.nextToken();
+      }else{
+        t = parser.getCurrentToken();
+        moveForward = true;
       }
-      JsonToken t = parser.nextToken();
+
       if (t == JsonToken.NOT_AVAILABLE || t == JsonToken.END_OBJECT) {
         return;
       }
@@ -234,23 +266,25 @@ public class JsonReader extends BaseJsonProcessor implements JsonProcessor {
       assert t == JsonToken.FIELD_NAME : String.format("Expected FIELD_NAME but got %s.", t.name());
 
       final String fieldName = parser.getText();
+
       FieldSelection childSelection = selection.getChild(fieldName);
-      if(childSelection.isNeverValid()){
+      if (childSelection.isNeverValid()) {
         consumeEntireNextValue();
         continue outside;
       }
 
-      switch(parser.nextToken()) {
+      switch (parser.nextToken()) {
       case START_ARRAY:
         writeData(map.list(fieldName));
         break;
       case START_OBJECT:
-        writeData(map.map(fieldName), childSelection);
+        if (!writeMapDataIfTyped(map, fieldName)) {
+          writeData(map.map(fieldName), childSelection, false);
+        }
         break;
       case END_OBJECT:
         break outside;
 
-      case VALUE_EMBEDDED_OBJECT:
       case VALUE_FALSE: {
         map.bit(fieldName).writeBit(0);
         atLeastOneWrite = true;
@@ -262,10 +296,6 @@ public class JsonReader extends BaseJsonProcessor implements JsonProcessor {
         break;
       }
       case VALUE_NULL:
-        // do check value capacity only if vector is allocated.
-        if (map.getValueCapacity() > 0) {
-          map.checkValueCapacity();
-        }
         // do nothing as we don't have a type.
         break;
       case VALUE_NUMBER_FLOAT:
@@ -285,20 +315,27 @@ public class JsonReader extends BaseJsonProcessor implements JsonProcessor {
         throw new IllegalStateException("Unexpected token " + parser.getCurrentToken());
 
       }
+
     }
     map.end();
 
   }
 
-
-  private void writeDataAllText(MapWriter map, FieldSelection selection) throws IOException {
+  private void writeDataAllText(MapWriter map, FieldSelection selection, boolean moveForward) throws IOException {
     //
     map.start();
-    outside: while(true) {
-      if (!map.ok()) {
-        return;
+    outside: while (true) {
+
+
+      JsonToken t;
+
+      if(moveForward){
+        t = parser.nextToken();
+      }else{
+        t = parser.getCurrentToken();
+        moveForward = true;
       }
-      JsonToken t = parser.nextToken();
+
       if (t == JsonToken.NOT_AVAILABLE || t == JsonToken.END_OBJECT) {
         return;
       }
@@ -307,18 +344,19 @@ public class JsonReader extends BaseJsonProcessor implements JsonProcessor {
 
       final String fieldName = parser.getText();
       FieldSelection childSelection = selection.getChild(fieldName);
-      if(childSelection.isNeverValid()){
+      if (childSelection.isNeverValid()) {
         consumeEntireNextValue();
         continue outside;
       }
 
-
-      switch(parser.nextToken()) {
+      switch (parser.nextToken()) {
       case START_ARRAY:
         writeDataAllText(map.list(fieldName));
         break;
       case START_OBJECT:
-        writeDataAllText(map.map(fieldName), childSelection);
+        if (!writeMapDataIfTyped(map, fieldName)) {
+          writeDataAllText(map.map(fieldName), childSelection, false);
+        }
         break;
       case END_OBJECT:
         break outside;
@@ -333,14 +371,9 @@ public class JsonReader extends BaseJsonProcessor implements JsonProcessor {
         atLeastOneWrite = true;
         break;
       case VALUE_NULL:
-        // do check value capacity only if vector is allocated.
-        if (map.getValueCapacity() > 0) {
-          map.checkValueCapacity();
-        }
         // do nothing as we don't have a type.
         break;
 
-
       default:
         throw new IllegalStateException("Unexpected token " + parser.getCurrentToken());
 
@@ -350,46 +383,65 @@ public class JsonReader extends BaseJsonProcessor implements JsonProcessor {
 
   }
 
-
-  private void ensure(int length) {
-    workBuf = workBuf.reallocIfNeeded(length);
+  /**
+   * Will attempt to take the current value and consume it as an extended value (if extended mode is enabled).  Whether extended is enable or disabled, will consume the next token in the stream.
+   * @param writer
+   * @param fieldName
+   * @return
+   * @throws IOException
+   */
+  private boolean writeMapDataIfTyped(MapWriter writer, String fieldName) throws IOException {
+    if (extended) {
+      return mapOutput.run(writer, fieldName);
+    } else {
+      parser.nextToken();
+      return false;
+    }
   }
 
-  private int prepareVarCharHolder(String value) throws IOException {
-    byte[] b = value.getBytes(Charsets.UTF_8);
-    ensure(b.length);
-    workBuf.setBytes(0, b);
-    return b.length;
+  /**
+   * Will attempt to take the current value and consume it as an extended value (if extended mode is enabled).  Whether extended is enable or disabled, will consume the next token in the stream.
+   * @param writer
+   * @return
+   * @throws IOException
+   */
+  private boolean writeListDataIfTyped(ListWriter writer) throws IOException {
+    if (extended) {
+      return listOutput.run(writer);
+    } else {
+      parser.nextToken();
+      return false;
+    }
   }
 
   private void handleString(JsonParser parser, MapWriter writer, String fieldName) throws IOException {
-    writer.varChar(fieldName).writeVarChar(0, prepareVarCharHolder(parser.getText()), workBuf);
+    writer.varChar(fieldName).writeVarChar(0, workingBuffer.prepareVarCharHolder(parser.getText()),
+        workingBuffer.getBuf());
   }
 
   private void handleString(JsonParser parser, ListWriter writer) throws IOException {
-    writer.varChar().writeVarChar(0, prepareVarCharHolder(parser.getText()), workBuf);
+    writer.varChar().writeVarChar(0, workingBuffer.prepareVarCharHolder(parser.getText()), workingBuffer.getBuf());
   }
 
   private void writeData(ListWriter list) throws IOException {
     list.start();
     outside: while (true) {
-      if (!list.ok()) {
-        return;
-      }
 
       switch (parser.nextToken()) {
       case START_ARRAY:
         writeData(list.list());
         break;
       case START_OBJECT:
-        writeData(list.map(), FieldSelection.ALL_VALID);
+        if (!writeListDataIfTyped(list)) {
+          writeData(list.map(), FieldSelection.ALL_VALID, false);
+        }
         break;
       case END_ARRAY:
       case END_OBJECT:
         break outside;
 
       case VALUE_EMBEDDED_OBJECT:
-      case VALUE_FALSE:{
+      case VALUE_FALSE: {
         list.bit().writeBit(0);
         atLeastOneWrite = true;
         break;
@@ -400,9 +452,9 @@ public class JsonReader extends BaseJsonProcessor implements JsonProcessor {
         break;
       }
       case VALUE_NULL:
-        throw new DrillRuntimeException("Null values are not supported in lists be default. " +
-            "Please set `store.json.all_text_mode` to true to read lists containing nulls. " +
-            "Be advised that this will treat JSON null values as string containing the word 'null'.");
+        throw new DrillRuntimeException("Drill does not currently null values in lists. "
+            + "Please set `store.json.all_text_mode` to true to read lists containing nulls. "
+            + "Be advised that this will treat JSON null values as string containing the word 'null'.");
       case VALUE_NUMBER_FLOAT:
         list.float8().writeFloat8(parser.getDoubleValue());
         atLeastOneWrite = true;
@@ -426,16 +478,15 @@ public class JsonReader extends BaseJsonProcessor implements JsonProcessor {
   private void writeDataAllText(ListWriter list) throws IOException {
     list.start();
     outside: while (true) {
-      if (!list.ok()) {
-        return;
-      }
 
       switch (parser.nextToken()) {
       case START_ARRAY:
         writeDataAllText(list.list());
         break;
       case START_OBJECT:
-        writeDataAllText(list.map(), FieldSelection.ALL_VALID);
+        if (!writeListDataIfTyped(list)) {
+          writeDataAllText(list.map(), FieldSelection.ALL_VALID, false);
+        }
         break;
       case END_ARRAY:
       case END_OBJECT:
@@ -460,7 +511,7 @@ public class JsonReader extends BaseJsonProcessor implements JsonProcessor {
   }
 
   public DrillBuf getWorkBuf() {
-    return workBuf;
+    return workingBuffer.getBuf();
   }
 
 }