You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/07/29 03:40:54 UTC

[2/2] drill git commit: DRILL-3353: Fix dropping nested fields

DRILL-3353: Fix dropping nested fields

Use the SchemaChangeCallBack in more places to track schema changes
Reset the ephemeral transfer pair when making a new transfer pair for Map or RepeatedMap


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/496f1466
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/496f1466
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/496f1466

Branch: refs/heads/master
Commit: 496f14669b485d5cd51b1f2a742b90de794190a9
Parents: 5e33a28
Author: Steven Phillips <sm...@apache.org>
Authored: Wed Jul 8 17:35:09 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue Jul 28 18:13:22 2015 -0700

----------------------------------------------------------------------
 .../src/main/codegen/templates/TypeHelper.java   |  4 ++--
 .../drill/exec/physical/impl/OutputMutator.java  |  7 +++++++
 .../drill/exec/physical/impl/ScanBatch.java      | 11 +++++++++++
 .../drill/exec/physical/impl/TopN/TopNBatch.java |  2 +-
 .../physical/impl/filter/FilterRecordBatch.java  |  2 +-
 .../physical/impl/limit/LimitRecordBatch.java    |  2 +-
 .../impl/project/ProjectRecordBatch.java         |  4 ++--
 .../impl/svremover/RemovingRecordBatch.java      | 12 +++++++-----
 .../exec/store/parquet/ParquetRecordWriter.java  | 15 ++++++++++++++-
 .../exec/vector/complex/AbstractMapVector.java   |  7 ++++---
 .../drill/exec/vector/complex/MapVector.java     | 11 ++++++++---
 .../exec/vector/complex/RepeatedMapVector.java   |  1 +
 .../complex/impl/VectorContainerWriter.java      |  7 ++++---
 .../drill/exec/store/TestOutputMutator.java      |  6 ++++++
 .../exec/store/json/TestJsonRecordReader.java    | 19 +++++++++++++++++++
 .../store/parquet/ParquetRecordReaderTest.java   |  8 +++++++-
 .../test/resources/jsoninput/drill_3353/a.json   |  3 +++
 .../test/resources/jsoninput/drill_3353/b.json   |  3 +++
 18 files changed, 101 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/496f1466/exec/java-exec/src/main/codegen/templates/TypeHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/TypeHelper.java b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
index d6ccd3a..9c66cb7 100644
--- a/exec/java-exec/src/main/codegen/templates/TypeHelper.java
+++ b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
@@ -91,10 +91,10 @@ public class TypeHelper {
     throw new UnsupportedOperationException(buildErrorMessage("find sql accessor", type));
   }
   
-  public static ValueVector getNewVector(SchemaPath parentPath, String name, BufferAllocator allocator, MajorType type){
+  public static ValueVector getNewVector(SchemaPath parentPath, String name, BufferAllocator allocator, MajorType type, CallBack callback){
     SchemaPath child = parentPath.getChild(name);
     MaterializedField field = MaterializedField.create(child, type);
-    return getNewVector(field, allocator);
+    return getNewVector(field, allocator, callback);
   }
   
   

http://git-wip-us.apache.org/repos/asf/drill/blob/496f1466/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
index 0fe79d9..e109ec0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
@@ -21,6 +21,7 @@ import io.netty.buffer.DrillBuf;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.util.CallBack;
 import org.apache.drill.exec.vector.ValueVector;
 
 /**
@@ -61,4 +62,10 @@ public interface OutputMutator {
    * @return A DrillBuf that will be released at the end of the current query (and can be resized as desired during use).
    */
   public DrillBuf getManagedBuffer();
+
+  /**
+   *
+   * @return the CallBack object for this mutator
+   */
+  public CallBack getCallBack();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/496f1466/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 4b91e1f..873ae76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -51,6 +51,7 @@ import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
+import org.apache.drill.exec.util.CallBack;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
@@ -220,6 +221,11 @@ public class ScanBatch implements CloseableRecordBatch {
       hasReadNonEmptyFile = true;
       populatePartitionVectors();
 
+      for (VectorWrapper w : container) {
+        w.getValueVector().getMutator().setValueCount(recordCount);
+      }
+
+
       // this is a slight misuse of this metric but it will allow Readers to report how many records they generated.
       final boolean isNewSchema = mutator.isNewSchema();
       oContext.getStats().batchReceived(0, getRecordCount(), isNewSchema);
@@ -354,6 +360,11 @@ public class ScanBatch implements CloseableRecordBatch {
     public DrillBuf getManagedBuffer() {
       return oContext.getManagedBuffer();
     }
+
+    @Override
+    public CallBack getCallBack() {
+      return callBack;
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/496f1466/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 516b028..10f1d7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -276,7 +276,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     SimpleRecordBatch batch = new SimpleRecordBatch(c, selectionVector4, context);
     SimpleRecordBatch newBatch = new SimpleRecordBatch(newContainer, null, context);
     if (copier == null) {
-      copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(),  newContainer, newBatch);
+      copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(),  newContainer, newBatch, null);
     } else {
       for (VectorWrapper<?> i : batch) {
 

http://git-wip-us.apache.org/repos/asf/drill/blob/496f1466/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index 5eee9df..c1d78c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -193,7 +193,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
     cg.addExpr(new ReturnValueExpression(expr));
 
     for (VectorWrapper<?> v : incoming) {
-      TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField()));
+      TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField(), callBack));
       transfers.add(pair);
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/496f1466/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index d9330ea..4ea5a5c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -63,7 +63,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
 
 
     for(VectorWrapper<?> v : incoming){
-      TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField()));
+      TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField(), callBack));
       transfers.add(pair);
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/496f1466/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index b6e5dc0..5b5c90d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -325,7 +325,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
                 continue;
               }
               final FieldReference ref = new FieldReference(name);
-              final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vvIn.getField().getType()));
+              final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vvIn.getField().getType()), callBack);
               final TransferPair tp = vvIn.makeTransferPair(vvOut);
               transfers.add(tp);
             }
@@ -399,7 +399,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         Preconditions.checkNotNull(incoming);
 
         final FieldReference ref = getRef(namedExpression);
-        final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vectorRead.getMajorType()));
+        final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vectorRead.getMajorType()), callBack);
         final TransferPair tp = vvIn.makeTransferPair(vvOut);
         transfers.add(tp);
         transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);

http://git-wip-us.apache.org/repos/asf/drill/blob/496f1466/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 57e7b55..b5b1b0a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -34,7 +34,9 @@ import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.util.CallBack;
 import org.apache.drill.exec.vector.CopyUtil;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.base.Preconditions;
@@ -194,7 +196,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
     @Override
     public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing){
       for(VectorWrapper<?> vv : incoming){
-        TransferPair tp = vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField()));
+        TransferPair tp = vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack));
         pairs.add(tp);
       }
     }
@@ -220,7 +222,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
     Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
 
     for(VectorWrapper<?> vv : incoming){
-      TransferPair tp = vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField()));
+      TransferPair tp = vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack));
     }
 
     try {
@@ -237,14 +239,14 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
 
   private Copier getGenerated4Copier() throws SchemaChangeException {
     Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE);
-    return getGenerated4Copier(incoming, context, oContext.getAllocator(), container, this);
+    return getGenerated4Copier(incoming, context, oContext.getAllocator(), container, this, callBack);
   }
 
-  public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing) throws SchemaChangeException{
+  public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing, SchemaChangeCallBack callBack) throws SchemaChangeException{
 
     for(VectorWrapper<?> vv : batch){
       ValueVector v = vv.getValueVectors()[0];
-      v.makeTransferPair(container.addOrGet(v.getField()));
+      v.makeTransferPair(container.addOrGet(v.getField(), callBack));
     }
 
     try {

http://git-wip-us.apache.org/repos/asf/drill/blob/496f1466/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 12b15a9..f118535 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -143,9 +143,22 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     enableDictionary = Boolean.parseBoolean(writerOptions.get(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING));
   }
 
+  private boolean containsComplexVectors(BatchSchema schema) {
+    for (MaterializedField field : schema) {
+      MinorType type = field.getType().getMinorType();
+      switch (type) {
+      case MAP:
+      case LIST:
+        return true;
+      default:
+      }
+    }
+    return false;
+  }
+
   @Override
   public void updateSchema(VectorAccessible batch) throws IOException {
-    if (this.batchSchema == null || !this.batchSchema.equals(batch.getSchema())) {
+    if (this.batchSchema == null || !this.batchSchema.equals(batch.getSchema()) || containsComplexVectors(this.batchSchema)) {
       if (this.batchSchema != null) {
         flush();
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/496f1466/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
index 1df4b81..efba46d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
@@ -41,9 +41,10 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
   private final MapWithOrdinal<String, ValueVector> vectors =  new MapWithOrdinal<>();
 
   protected AbstractMapVector(MaterializedField field, BufferAllocator allocator, CallBack callBack) {
-    super(field, allocator, callBack);
+    super(field.clone(), allocator, callBack);
+    MaterializedField clonedField = field.clone();
     // create the hierarchy of the child vectors based on the materialized field
-    for (MaterializedField child : field.getChildren()) {
+    for (MaterializedField child : clonedField.getChildren()) {
       if (!child.equals(BaseRepeatedValueVector.OFFSETS_FIELD)) {
         String fieldName = child.getLastName();
         ValueVector v = TypeHelper.getNewVector(child, allocator, callBack);
@@ -116,7 +117,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
       create = true;
     }
     if (create) {
-      final T vector = (T) TypeHelper.getNewVector(field.getPath(), name, allocator, type);
+      final T vector = (T) TypeHelper.getNewVector(field.getPath(), name, allocator, type, callBack);
       putChild(name, vector);
       if (callBack!=null) {
         callBack.doWork();

http://git-wip-us.apache.org/repos/asf/drill/blob/496f1466/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
index 3032aac..1e30ea2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -161,6 +161,8 @@ public class MapVector extends AbstractMapVector {
       this.from = from;
       this.to = to;
       this.pairs = new TransferPair[from.size()];
+      this.to.ephPair = null;
+      this.to.ephPair2 = null;
 
       int i = 0;
       ValueVector vector;
@@ -294,9 +296,12 @@ public class MapVector extends AbstractMapVector {
     public Object getObject(int index) {
       Map<String, Object> vv = new JsonStringHashMap();
       for (String child:getChildFieldNames()) {
-        Object value = getChild(child).getAccessor().getObject(index);
-        if (value != null) {
-          vv.put(child, value);
+        ValueVector v = getChild(child);
+        if (v != null) {
+          Object value = v.getAccessor().getObject(index);
+          if (value != null) {
+            vv.put(child, value);
+          }
         }
       }
       return vv;

http://git-wip-us.apache.org/repos/asf/drill/blob/496f1466/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index 97f5b39..644e5db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -326,6 +326,7 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedValu
       this.from = from;
       this.to = to;
       this.pairs = new TransferPair[from.size()];
+      this.to.ephPair = null;
 
       int i = 0;
       ValueVector vector;

http://git-wip-us.apache.org/repos/asf/drill/blob/496f1466/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
index 6b6ab46..5aea0ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
@@ -21,6 +21,7 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.util.CallBack;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
@@ -35,7 +36,7 @@ public class VectorContainerWriter extends AbstractFieldWriter implements Comple
   public VectorContainerWriter(OutputMutator mutator) {
     super(null);
     this.mutator = mutator;
-    this.mapVector = new SpecialMapVector();
+    this.mapVector = new SpecialMapVector(mutator.getCallBack());
     this.mapRoot = new SingleMapWriter(mapVector, this);
   }
 
@@ -81,8 +82,8 @@ public class VectorContainerWriter extends AbstractFieldWriter implements Comple
 
   private class SpecialMapVector extends MapVector {
 
-    public SpecialMapVector() {
-      super("", null, null);
+    public SpecialMapVector(CallBack callback) {
+      super("", null, callback);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/496f1466/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
index 0509b7b..e3591b6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.util.CallBack;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.collect.Maps;
@@ -92,4 +93,9 @@ public class TestOutputMutator implements OutputMutator, Iterable<VectorWrapper<
     return allocator.buffer(255);
   }
 
+  @Override
+  public CallBack getCallBack() {
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/496f1466/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
index bb1af9e..c1fb928 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.json;
 
 import org.apache.drill.BaseTestQuery;
+import org.apache.drill.TestBuilder;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.junit.Test;
@@ -154,4 +155,22 @@ public class TestJsonRecordReader extends BaseTestQuery{
       testNoResult("alter session set `store.json.read_numbers_as_double`= false");
     }
   }
+
+  @Test
+  public void drill_3353() throws Exception {
+    try {
+      testNoResult("alter session set `store.json.all_text_mode` = true");
+      test("create table dfs_test.tmp.drill_3353 as select a from dfs.`${WORKING_PATH}/src/test/resources/jsoninput/drill_3353` where e = true");
+      String query = "select t.a.d cnt from dfs_test.tmp.drill_3353 t where t.a.d is not null";
+      test(query);
+      testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("cnt")
+          .baselineValues("1")
+          .go();
+    } finally {
+      testNoResult("alter session set `store.json.all_text_mode` = false");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/496f1466/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 61380cf..af1b896 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -55,6 +55,7 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.CachedSingleFileSystem;
 import org.apache.drill.exec.store.TestOutputMutator;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.drill.exec.util.CallBack;
 import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.vector.NullableBigIntVector;
 import org.apache.drill.exec.vector.ValueVector;
@@ -363,7 +364,12 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
     public DrillBuf getManagedBuffer() {
       return allocator.buffer(255);
     }
-  }
+
+   @Override
+   public CallBack getCallBack() {
+     return null;
+   }
+ }
 
   private void validateFooters(final List<Footer> metadata) {
     logger.debug(metadata.toString());

http://git-wip-us.apache.org/repos/asf/drill/blob/496f1466/exec/java-exec/src/test/resources/jsoninput/drill_3353/a.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/jsoninput/drill_3353/a.json b/exec/java-exec/src/test/resources/jsoninput/drill_3353/a.json
new file mode 100644
index 0000000..0ffb7d3
--- /dev/null
+++ b/exec/java-exec/src/test/resources/jsoninput/drill_3353/a.json
@@ -0,0 +1,3 @@
+{ a : { b : 1, c : 1 }, e : false } 
+{ a : { b : 1, c : 1 }, e : false } 
+{ a : { b : 1, c : 1 }, e : true  } 

http://git-wip-us.apache.org/repos/asf/drill/blob/496f1466/exec/java-exec/src/test/resources/jsoninput/drill_3353/b.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/jsoninput/drill_3353/b.json b/exec/java-exec/src/test/resources/jsoninput/drill_3353/b.json
new file mode 100644
index 0000000..56a4b3b
--- /dev/null
+++ b/exec/java-exec/src/test/resources/jsoninput/drill_3353/b.json
@@ -0,0 +1,3 @@
+{ a : { b : 1, d : 1 }, e : false } 
+{ a : { b : 1, d : 1 }, e : false } 
+{ a : { b : 1, d : 1 }, e : true  }