You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/04/15 17:56:05 UTC

[4/6] drill git commit: DRILL-2695: Add Support for large in conditions through the use of the Values operator. Update JSON reader to support reading Extended JSON. Update JSON writer to support writing extended JSON data. Update JSON reader to automatic

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java
index de52b73..8309bf3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java
@@ -32,11 +32,19 @@ public class JsonWriter {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonWriter.class);
 
   private final JsonFactory factory = new JsonFactory();
-  private final JsonGenerator gen;
+  private final JsonOutput gen;
 
-  public JsonWriter(OutputStream out, boolean pretty) throws IOException{
+  public JsonWriter(OutputStream out, boolean pretty, boolean useExtendedOutput) throws IOException{
     JsonGenerator writer = factory.createJsonGenerator(out);
-    gen = pretty ? writer.useDefaultPrettyPrinter() : writer;
+    if(pretty){
+      writer = writer.useDefaultPrettyPrinter();
+    }
+    if(useExtendedOutput){
+      gen = new ExtendedJsonOutput(writer);
+    }else{
+      gen = new BasicJsonOutput(writer);
+    }
+
   }
 
   public void write(FieldReader reader) throws JsonGenerationException, IOException{
@@ -50,58 +58,45 @@ public class JsonWriter {
 
     switch(m){
     case OPTIONAL:
-      if(!reader.isSet()){
-        gen.writeNull();
-        break;
-      }
-
     case REQUIRED:
 
 
       switch (mt) {
       case FLOAT4:
-        gen.writeNumber(reader.readFloat());
+        gen.writeFloat(reader);
         break;
       case FLOAT8:
-        gen.writeNumber(reader.readDouble());
+        gen.writeDouble(reader);
         break;
       case INT:
-        Integer i = reader.readInteger();
-        if(i == null){
-          gen.writeNull();
-        }else{
-          gen.writeNumber(reader.readInteger());
-        }
+        gen.writeInt(reader);
         break;
       case SMALLINT:
-        gen.writeNumber(reader.readShort());
+        gen.writeSmallInt(reader);
         break;
       case TINYINT:
-        gen.writeNumber(reader.readByte());
+        gen.writeTinyInt(reader);
         break;
       case BIGINT:
-        Long l = reader.readLong();
-        if(l == null){
-          gen.writeNull();
-        }else{
-          gen.writeNumber(reader.readLong());
-        }
-
+        gen.writeBigInt(reader);
         break;
       case BIT:
-        gen.writeBoolean(reader.readBoolean());
+        gen.writeBoolean(reader);
         break;
 
       case DATE:
+        gen.writeDate(reader);
+        break;
       case TIME:
+        gen.writeTime(reader);
+        break;
       case TIMESTAMP:
-      case TIMESTAMPTZ:
-        gen.writeString(reader.readDateTime().toString());
-
+        gen.writeTimestamp(reader);
+        break;
       case INTERVALYEAR:
       case INTERVALDAY:
       case INTERVAL:
-        gen.writeString(reader.readPeriod().toString());
+        gen.writeInterval(reader);
         break;
       case DECIMAL28DENSE:
       case DECIMAL28SPARSE:
@@ -109,7 +104,7 @@ public class JsonWriter {
       case DECIMAL38SPARSE:
       case DECIMAL9:
       case DECIMAL18:
-        gen.writeNumber(reader.readBigDecimal());
+        gen.writeDecimal(reader);
         break;
 
       case LIST:
@@ -130,17 +125,17 @@ public class JsonWriter {
         gen.writeEndObject();
         break;
       case NULL:
-        gen.writeNull();
+        gen.writeUntypedNull();
         break;
 
       case VAR16CHAR:
-        gen.writeString(reader.readString());
+        gen.writeVar16Char(reader);
         break;
       case VARBINARY:
-        gen.writeBinary(reader.readByteArray());
+        gen.writeBinary(reader);
         break;
       case VARCHAR:
-        gen.writeString(reader.readText().toString());
+        gen.writeVarChar(reader);
         break;
 
       }
@@ -151,54 +146,61 @@ public class JsonWriter {
       switch (mt) {
       case FLOAT4:
         for(int i = 0; i < reader.size(); i++){
-          gen.writeNumber(reader.readFloat(i));
+          gen.writeFloat(i, reader);
         }
 
         break;
       case FLOAT8:
         for(int i = 0; i < reader.size(); i++){
-          gen.writeNumber(reader.readDouble(i));
+          gen.writeDouble(i, reader);
         }
         break;
       case INT:
         for(int i = 0; i < reader.size(); i++){
-          gen.writeNumber(reader.readInteger(i));
+          gen.writeInt(i, reader);
         }
         break;
       case SMALLINT:
         for(int i = 0; i < reader.size(); i++){
-          gen.writeNumber(reader.readShort(i));
+          gen.writeSmallInt(i, reader);
         }
         break;
       case TINYINT:
         for(int i = 0; i < reader.size(); i++){
-          gen.writeNumber(reader.readByte(i));
+          gen.writeTinyInt(i, reader);
         }
         break;
       case BIGINT:
         for(int i = 0; i < reader.size(); i++){
-          gen.writeNumber(reader.readLong(i));
+          gen.writeBigInt(i, reader);
         }
         break;
       case BIT:
         for(int i = 0; i < reader.size(); i++){
-        gen.writeBoolean(reader.readBoolean(i));
+          gen.writeBoolean(i, reader);
         }
         break;
 
       case DATE:
+        for(int i = 0; i < reader.size(); i++){
+          gen.writeDate(i, reader);
+        }
+        break;
       case TIME:
+        for(int i = 0; i < reader.size(); i++){
+          gen.writeTime(i, reader);
+        }
+        break;
       case TIMESTAMP:
-      case TIMESTAMPTZ:
         for(int i = 0; i < reader.size(); i++){
-        gen.writeString(reader.readDateTime(i).toString());
+          gen.writeTimestamp(i, reader);
         }
-
+        break;
       case INTERVALYEAR:
       case INTERVALDAY:
       case INTERVAL:
         for(int i = 0; i < reader.size(); i++){
-        gen.writeString(reader.readPeriod(i).toString());
+          gen.writeInterval(i, reader);
         }
         break;
       case DECIMAL28DENSE:
@@ -208,7 +210,7 @@ public class JsonWriter {
       case DECIMAL9:
       case DECIMAL18:
         for(int i = 0; i < reader.size(); i++){
-        gen.writeNumber(reader.readBigDecimal(i));
+          gen.writeDecimal(i, reader);
         }
         break;
 
@@ -237,20 +239,22 @@ public class JsonWriter {
 
       case VAR16CHAR:
         for(int i = 0; i < reader.size(); i++){
-          gen.writeString(reader.readString(i));
+          gen.writeVar16Char(i, reader);
         }
         break;
       case VARBINARY:
         for(int i = 0; i < reader.size(); i++){
-        gen.writeBinary(reader.readByteArray(i));
+          gen.writeBinary(i, reader);
         }
         break;
       case VARCHAR:
         for(int i = 0; i < reader.size(); i++){
-        gen.writeString(reader.readText(i).toString());
+          gen.writeVarChar(i, reader);
         }
         break;
 
+      default:
+        throw new IllegalStateException(String.format("Unable to handle type %s.", mt));
       }
       gen.writeEndArray();
       break;

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java
new file mode 100644
index 0000000..651de3d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.fn;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.expr.fn.impl.DateUtility;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Decimal38DenseHolder;
+import org.apache.drill.exec.expr.holders.IntervalHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+import org.apache.drill.exec.vector.complex.writer.BigIntWriter;
+import org.apache.drill.exec.vector.complex.writer.DateWriter;
+import org.apache.drill.exec.vector.complex.writer.IntervalWriter;
+import org.apache.drill.exec.vector.complex.writer.TimeStampWriter;
+import org.apache.drill.exec.vector.complex.writer.TimeWriter;
+import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import org.joda.time.format.ISOPeriodFormat;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+
+abstract class VectorOutput {
+
+  final VarBinaryHolder binary = new VarBinaryHolder();
+  final TimeHolder time = new TimeHolder();
+  final DateHolder date = new DateHolder();
+  final TimeStampHolder timestamp = new TimeStampHolder();
+  final IntervalHolder interval = new IntervalHolder();
+  final BigIntHolder bigint = new BigIntHolder();
+  final Decimal38DenseHolder decimal = new Decimal38DenseHolder();
+  final VarCharHolder varchar = new VarCharHolder();
+
+  protected final WorkingBuffer work;
+  protected JsonParser parser;
+
+
+  public VectorOutput(WorkingBuffer work){
+    this.work = work;
+  }
+
+  public void setParser(JsonParser parser){
+    this.parser = parser;
+  }
+
+  protected boolean innerRun() throws IOException{
+    JsonToken t = parser.nextToken();
+    if(t != JsonToken.FIELD_NAME){
+      return false;
+    }
+
+    String possibleTypeName = parser.getText();
+    if(!possibleTypeName.isEmpty() && possibleTypeName.charAt(0) == '$'){
+      switch(possibleTypeName){
+      case ExtendedTypeName.BINARY:
+        writeBinary(checkNextToken(JsonToken.VALUE_STRING));
+        checkNextToken(JsonToken.END_OBJECT);
+        return true;
+      case ExtendedTypeName.DATE:
+        writeDate(checkNextToken(JsonToken.VALUE_STRING));
+        checkNextToken(JsonToken.END_OBJECT);
+        return true;
+      case ExtendedTypeName.TIME:
+        writeTime(checkNextToken(JsonToken.VALUE_STRING));
+        checkNextToken(JsonToken.END_OBJECT);
+        return true;
+      case ExtendedTypeName.TIMESTAMP:
+        writeTimestamp(checkNextToken(JsonToken.VALUE_STRING));
+        checkNextToken(JsonToken.END_OBJECT);
+        return true;
+      case ExtendedTypeName.INTERVAL:
+        writeInterval(checkNextToken(JsonToken.VALUE_STRING));
+        checkNextToken(JsonToken.END_OBJECT);
+        return true;
+      case ExtendedTypeName.INTEGER:
+        writeInteger(checkNextToken(JsonToken.VALUE_NUMBER_INT));
+        checkNextToken(JsonToken.END_OBJECT);
+        return true;
+      case ExtendedTypeName.DECIMAL:
+        writeDecimal(checkNextToken(JsonToken.VALUE_NUMBER_FLOAT, JsonToken.VALUE_NUMBER_INT));
+        checkNextToken(JsonToken.END_OBJECT);
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public boolean checkNextToken(final JsonToken expected) throws IOException{
+    return checkNextToken(expected, expected);
+  }
+
+  public boolean checkNextToken(final JsonToken expected1, final JsonToken expected2) throws IOException{
+    JsonToken t = parser.nextToken();
+    if(t == JsonToken.VALUE_NULL){
+      return true;
+    }else if(t == expected1){
+      return false;
+    }else if(t == expected2){
+      return false;
+    }else{
+      throw new JsonParseException(String.format("Failure while reading ExtendedJSON typed value. Expected a %s but "
+          + "received a token of type %s", expected1, t), parser.getCurrentLocation());
+    }
+  }
+
+  public abstract void writeBinary(boolean isNull) throws IOException;
+  public abstract void writeDate(boolean isNull) throws IOException;
+  public abstract void writeTime(boolean isNull) throws IOException;
+  public abstract void writeTimestamp(boolean isNull) throws IOException;
+  public abstract void writeInterval(boolean isNull) throws IOException;
+  public abstract void writeInteger(boolean isNull) throws IOException;
+  public abstract void writeDecimal(boolean isNull) throws IOException;
+
+  static class ListVectorOutput extends VectorOutput{
+    private ListWriter writer;
+
+    public ListVectorOutput(WorkingBuffer work) {
+      super(work);
+    }
+
+    public boolean run(ListWriter writer) throws IOException{
+      this.writer = writer;
+      return innerRun();
+    }
+
+    @Override
+    public void writeBinary(boolean isNull) throws IOException {
+      VarBinaryWriter bin = writer.varBinary();
+      if(!isNull){
+        work.prepareBinary(parser.getBinaryValue(), binary);
+        bin.write(binary);
+      }
+    }
+
+    @Override
+    public void writeDate(boolean isNull) throws IOException {
+      DateWriter dt = writer.date();
+      if(!isNull){
+        work.prepareVarCharHolder(parser.getValueAsString(), varchar);
+        dt.writeDate(StringFunctionHelpers.getDate(varchar.buffer, varchar.start, varchar.end));
+      }
+    }
+
+    @Override
+    public void writeTime(boolean isNull) throws IOException {
+      TimeWriter t = writer.time();
+      if(!isNull){
+        DateTimeFormatter f = ISODateTimeFormat.time();
+        t.writeTime((int) ((f.parseDateTime(parser.getValueAsString())).withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis()));
+      }
+    }
+
+    @Override
+    public void writeTimestamp(boolean isNull) throws IOException {
+      TimeStampWriter ts = writer.timeStamp();
+      if(!isNull){
+        DateTimeFormatter f = ISODateTimeFormat.dateTime();
+        ts.writeTimeStamp(DateTime.parse(parser.getValueAsString(), f).withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis());
+      }
+    }
+
+    @Override
+    public void writeInterval(boolean isNull) throws IOException {
+      IntervalWriter intervalWriter = writer.interval();
+      if(!isNull){
+        final Period p = ISOPeriodFormat.standard().parsePeriod(parser.getValueAsString());
+        int months = DateUtility.monthsFromPeriod(p);
+        int days = p.getDays();
+        int millis = DateUtility.millisFromPeriod(p);
+        intervalWriter.writeInterval(months, days, millis);
+      }
+    }
+
+    @Override
+    public void writeInteger(boolean isNull) throws IOException {
+      BigIntWriter intWriter = writer.bigInt();
+      if(!isNull){
+        intWriter.writeBigInt(parser.getLongValue());
+      }
+    }
+
+    @Override
+    public void writeDecimal(boolean isNull) throws IOException {
+      throw new JsonParseException("Decimal Extended types not yet supported.", parser.getCurrentLocation());
+    }
+
+  }
+
+  static class MapVectorOutput extends VectorOutput {
+
+    private MapWriter writer;
+    private String fieldName;
+
+    public MapVectorOutput(WorkingBuffer work) {
+      super(work);
+    }
+
+    public boolean run(MapWriter writer, String fieldName) throws IOException{
+      this.fieldName = fieldName;
+      this.writer = writer;
+      return innerRun();
+    }
+
+    @Override
+    public void writeBinary(boolean isNull) throws IOException {
+      VarBinaryWriter bin = writer.varBinary(fieldName);
+      if(!isNull){
+        work.prepareBinary(parser.getBinaryValue(), binary);
+        bin.write(binary);
+      }
+    }
+
+    @Override
+    public void writeDate(boolean isNull) throws IOException {
+      DateWriter dt = writer.date(fieldName);
+      if(!isNull){
+        DateTimeFormatter f = ISODateTimeFormat.date();
+        DateTime date = f.parseDateTime(parser.getValueAsString());
+        dt.writeDate(date.withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis());
+      }
+    }
+
+    @Override
+    public void writeTime(boolean isNull) throws IOException {
+      TimeWriter t = writer.time(fieldName);
+      if(!isNull){
+        DateTimeFormatter f = ISODateTimeFormat.time();
+        t.writeTime((int) ((f.parseDateTime(parser.getValueAsString())).withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis()));
+      }
+    }
+
+    @Override
+    public void writeTimestamp(boolean isNull) throws IOException {
+      TimeStampWriter ts = writer.timeStamp(fieldName);
+      if(!isNull){
+        DateTimeFormatter f = ISODateTimeFormat.dateTime();
+        ts.writeTimeStamp(DateTime.parse(parser.getValueAsString(), f).withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis());
+      }
+    }
+
+    @Override
+    public void writeInterval(boolean isNull) throws IOException {
+      IntervalWriter intervalWriter = writer.interval(fieldName);
+      if(!isNull){
+        final Period p = ISOPeriodFormat.standard().parsePeriod(parser.getValueAsString());
+        int months = DateUtility.monthsFromPeriod(p);
+        int days = p.getDays();
+        int millis = DateUtility.millisFromPeriod(p);
+        intervalWriter.writeInterval(months, days, millis);
+      }
+    }
+
+    @Override
+    public void writeInteger(boolean isNull) throws IOException {
+      BigIntWriter intWriter = writer.bigInt(fieldName);
+      if(!isNull){
+        intWriter.writeBigInt(parser.getLongValue());
+      }
+    }
+
+    @Override
+    public void writeDecimal(boolean isNull) throws IOException {
+      throw new IOException("Decimal Extended types not yet supported.");
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/WorkingBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/WorkingBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/WorkingBuffer.java
new file mode 100644
index 0000000..7d10d3b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/WorkingBuffer.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.fn;
+
+import io.netty.buffer.DrillBuf;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+
+import com.google.common.base.Charsets;
+
+class WorkingBuffer {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkingBuffer.class);
+
+  private DrillBuf workBuf;
+
+  public WorkingBuffer(DrillBuf workBuf) {
+    this.workBuf = workBuf;
+  }
+
+  private void ensure(int length) {
+    workBuf = workBuf.reallocIfNeeded(length);
+  }
+
+  public void prepareVarCharHolder(String value, VarCharHolder h) throws IOException {
+    byte[] b = value.getBytes(Charsets.UTF_8);
+    ensure(b.length);
+    workBuf.setBytes(0, b);
+    h.start = 0;
+    h.end = b.length;
+    h.buffer = workBuf;
+  }
+
+  public int prepareVarCharHolder(String value) throws IOException {
+    byte[] b = value.getBytes(Charsets.UTF_8);
+    ensure(b.length);
+    workBuf.setBytes(0, b);
+    return b.length;
+  }
+
+  public void prepareBinary(byte[] b, VarBinaryHolder h) throws IOException {
+    ensure(b.length);
+    workBuf.setBytes(0, b);
+    h.start = 0;
+    h.end = b.length;
+    h.buffer = workBuf;
+  }
+
+  public DrillBuf getBuf(){
+    return workBuf;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java
index 5d85d0a..ec8c00b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java
@@ -17,20 +17,17 @@
  */
 package org.apache.drill.exec.vector.complex.impl;
 
-import org.apache.drill.exec.vector.complex.WriteState;
 import org.apache.drill.exec.vector.complex.writer.FieldWriter;
 
 
 abstract class AbstractBaseWriter implements FieldWriter{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBaseWriter.class);
 
-  final WriteState state;
   final FieldWriter parent;
   private int index;
 
   public AbstractBaseWriter(FieldWriter parent) {
     super();
-    this.state = parent == null ? new WriteState() : parent.getState();
     this.parent = parent;
   }
 
@@ -38,10 +35,6 @@ abstract class AbstractBaseWriter implements FieldWriter{
     return parent;
   }
 
-  public boolean ok(){
-    return state.isOk();
-  }
-
   public boolean isRoot(){
     return parent == null;
   }
@@ -50,24 +43,10 @@ abstract class AbstractBaseWriter implements FieldWriter{
     return index;
   }
 
-  public void resetState(){
-    state.reset();
-  }
-
   public void setPosition(int index){
     this.index = index;
   }
 
-  void inform(boolean outcome){
-    if(!outcome){
-      state.fail(this);
-    }
-  }
-
-  public WriteState getState(){
-    return state;
-  }
-
   public void end(){
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
index a110dcc..a4a35e2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
@@ -56,17 +56,12 @@ public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWri
     return container.getValueCapacity();
   }
 
-  public void checkValueCapacity(){
-    inform(container.getValueCapacity() > idx());
-  }
-
   private void check(Mode... modes){
     StateTool.check(mode, modes);
   }
 
   public void reset(){
     setPosition(0);
-    resetState();
   }
 
   public void clear(){

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java
index 3d0a861..767c366 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java
@@ -48,7 +48,7 @@ public class RepeatedListReaderImpl extends AbstractFieldReader{
 
   @Override
   public void copyAsValue(ListWriter writer) {
-    if (currentOffset == NO_VALUES || writer.ok() == false) {
+    if (currentOffset == NO_VALUES) {
       return;
     }
     RepeatedListWriter impl = (RepeatedListWriter) writer;
@@ -57,7 +57,7 @@ public class RepeatedListReaderImpl extends AbstractFieldReader{
 
   @Override
   public void copyAsField(String name, MapWriter writer) {
-    if (currentOffset == NO_VALUES || writer.ok() == false) {
+    if (currentOffset == NO_VALUES) {
       return;
     }
     RepeatedListWriter impl = (RepeatedListWriter) writer.list(name);

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
index aa98818..71a5017 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
@@ -160,7 +160,7 @@ public class RepeatedMapReaderImpl extends AbstractFieldReader{
 
   @Override
   public void copyAsValue(MapWriter writer) {
-    if (currentOffset == NO_VALUES || writer.ok() == false) {
+    if (currentOffset == NO_VALUES) {
       return;
     }
     RepeatedMapWriter impl = (RepeatedMapWriter) writer;
@@ -168,7 +168,7 @@ public class RepeatedMapReaderImpl extends AbstractFieldReader{
   }
 
   public void copyAsValueSingle(MapWriter writer) {
-    if (currentOffset == NO_VALUES || writer.ok() == false) {
+    if (currentOffset == NO_VALUES) {
       return;
     }
     SingleMapWriter impl = (SingleMapWriter) writer;

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java
index ec6009e..1b39775 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java
@@ -93,18 +93,14 @@ public class SingleMapReaderImpl extends AbstractFieldReader{
 
   @Override
   public void copyAsValue(MapWriter writer){
-    if (writer.ok()) {
-      SingleMapWriter impl = (SingleMapWriter) writer;
-      impl.container.copyFromSafe(idx(), impl.idx(), vector);
-    }
+    SingleMapWriter impl = (SingleMapWriter) writer;
+    impl.container.copyFromSafe(idx(), impl.idx(), vector);
   }
 
   @Override
   public void copyAsField(String name, MapWriter writer){
-    if (writer.ok()) {
-      SingleMapWriter impl = (SingleMapWriter) writer.map(name);
-      impl.container.copyFromSafe(idx(), impl.idx(), vector);
-    }
+    SingleMapWriter impl = (SingleMapWriter) writer.map(name);
+    impl.container.copyFromSafe(idx(), impl.idx(), vector);
   }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
index c3c9354..6b6ab46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
@@ -48,17 +48,12 @@ public class VectorContainerWriter extends AbstractFieldWriter implements Comple
     return mapRoot.getValueCapacity();
   }
 
-  public void checkValueCapacity(){
-    inform(getValueCapacity() > idx());
-  }
-
   public MapVector getMapVector() {
     return mapVector;
   }
 
   public void reset() {
     setPosition(0);
-    resetState();
   }
 
   public void clear() {

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestLargeInClause.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestLargeInClause.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestLargeInClause.java
new file mode 100644
index 0000000..1e29214
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestLargeInClause.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.filter;
+
+import org.apache.drill.BaseTestQuery;
+import org.junit.Test;
+
+public class TestLargeInClause extends BaseTestQuery {
+
+  private static String getInIntList(int size){
+    StringBuffer sb = new StringBuffer();
+    for(int i =0; i < size; i++){
+      if(i != 0){
+        sb.append(", ");
+      }
+      sb.append(i);
+    }
+    return sb.toString();
+  }
+
+  private static String getInDateList(int size){
+    StringBuffer sb = new StringBuffer();
+    for(int i =0; i < size; i++){
+      if(i != 0){
+        sb.append(", ");
+      }
+      sb.append("DATE '1961-08-26'");
+    }
+    return sb.toString();
+  }
+
+  @Test
+  public void queryWith300InConditions() throws Exception {
+    test("select * from cp.`employee.json` where id in (" + getInIntList(300) + ")");
+  }
+
+  @Test
+  public void queryWith50000InConditions() throws Exception {
+    test("select * from cp.`employee.json` where id in (" + getInIntList(50000) + ")");
+  }
+
+  @Test
+  public void queryWith50000DateInConditions() throws Exception {
+    test("select * from cp.`employee.json` where cast(birth_date as date) in (" + getInDateList(500) + ")");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
index c4bfcce..8b09e80 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
@@ -34,6 +34,11 @@ public class TestJsonRecordReader extends BaseTestQuery{
   }
 
   @Test
+  public void testContainingArray() throws Exception {
+    test("select * from dfs.`${WORKING_PATH}/src/test/resources/store/json/listdoc.json`");
+  }
+
+  @Test
   public void testComplexMultipleTimes() throws Exception{
     for(int i =0 ; i < 5; i++){
     test("select * from cp.`join/merge_join.json`");

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestExtendedTypes.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestExtendedTypes.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestExtendedTypes.java
new file mode 100644
index 0000000..9d0af41
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestExtendedTypes.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.writer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.util.TestTools;
+import org.junit.Test;
+
+public class TestExtendedTypes extends BaseTestQuery {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestExtendedTypes.class);
+
+  @Test
+  public void checkReadWriteExtended() throws Exception {
+
+    final String originalFile = "${WORKING_PATH}/src/test/resources/vector/complex/extended.json".replaceAll(
+        Pattern.quote("${WORKING_PATH}"),
+        Matcher.quoteReplacement(TestTools.getWorkingPath()));
+
+    final String newTable = "TestExtendedTypes/newjson";
+    testNoResult("ALTER SESSION SET `store.format` = 'json'");
+
+    // create table
+    test("create table dfs_test.tmp.`%s` as select * from dfs.`%s`", newTable, originalFile);
+
+    // check query of table.
+    test("select * from dfs_test.tmp.`%s`", newTable);
+
+    // check that original file and new file match.
+    final byte[] originalData = Files.readAllBytes(Paths.get(originalFile));
+    final byte[] newData = Files.readAllBytes(Paths.get(BaseTestQuery.getDfsTestTmpSchemaLocation() + '/' + newTable
+        + "/0_0_0.json"));
+    assertEquals(new String(originalData), new String(newData));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java
index 098c7de..6e2a2b5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java
@@ -184,7 +184,6 @@ public class TestRepeated {
 
       map.end();
     }
-    assert writer.ok();
 
     {
       writer.setPosition(1);
@@ -247,7 +246,7 @@ public class TestRepeated {
 
 
     ByteArrayOutputStream stream = new ByteArrayOutputStream();
-    JsonWriter jsonWriter = new JsonWriter(stream, true);
+    JsonWriter jsonWriter = new JsonWriter(stream, true, true);
     FieldReader reader = v.getChild("col", MapVector.class).getReader();
     reader.setPosition(0);
     jsonWriter.write(reader);

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/test/resources/store/json/listdoc.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/json/listdoc.json b/exec/java-exec/src/test/resources/store/json/listdoc.json
new file mode 100644
index 0000000..3b35e7a
--- /dev/null
+++ b/exec/java-exec/src/test/resources/store/json/listdoc.json
@@ -0,0 +1,4 @@
+[
+  {a: 4, b:6},
+  {a: 5, b:7}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/314e5a2a/exec/java-exec/src/test/resources/vector/complex/extended.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/vector/complex/extended.json b/exec/java-exec/src/test/resources/vector/complex/extended.json
new file mode 100644
index 0000000..16f3528
--- /dev/null
+++ b/exec/java-exec/src/test/resources/vector/complex/extended.json
@@ -0,0 +1,41 @@
+{
+  "bin" : {
+    "$binary" : "ZHJpbGw="
+  },
+  "drill_date" : {
+    "$dateDay" : "1997-07-16"
+  },
+  "drill_timestamp" : {
+    "$date" : "2009-02-23T08:00:00.000Z"
+  },
+  "time" : {
+    "$time" : "19:20:30.450Z"
+  },
+  "interval" : {
+    "$interval" : "PT26.400S"
+  },
+  "integer" : {
+    "$numberLong" : 4
+  },
+  "inner" : {
+    "bin" : {
+      "$binary" : "ZHJpbGw="
+    },
+    "drill_date" : {
+      "$dateDay" : "1997-07-16"
+    },
+    "drill_timestamp" : {
+      "$date" : "2009-02-23T08:00:00.000Z"
+    },
+    "time" : {
+      "$time" : "19:20:30.450Z"
+    },
+    "interval" : {
+      "$interval" : "PT26.400S"
+    },
+    "integer" : {
+      "$numberLong" : 4
+    }
+  },
+  "other1" : 3.3
+}
\ No newline at end of file