You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/02 05:03:10 UTC

[10/13] git commit: Fix JsonRecordReader nested test

Fix JsonRecordReader nested test


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/bff8ab65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/bff8ab65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/bff8ab65

Branch: refs/heads/master
Commit: bff8ab65c83f5256e8fc95ea58261071a161deb2
Parents: fdb5c41
Author: Timothy Chen <tn...@gmail.com>
Authored: Tue Jul 23 21:20:06 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 1 19:30:47 2013 -0700

----------------------------------------------------------------------
 .../drill/exec/store/JSONRecordReader.java      | 683 ++++++++++---------
 .../drill/exec/store/JSONRecordReaderTest.java  |  84 +--
 2 files changed, 369 insertions(+), 398 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bff8ab65/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
index f1b5538..f72b519 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
@@ -31,13 +31,7 @@ import org.apache.drill.exec.schema.OrderedField;
 import org.apache.drill.exec.schema.RecordSchema;
 import org.apache.drill.exec.schema.SchemaIdGenerator;
 import org.apache.drill.exec.schema.json.jackson.JacksonHelper;
-import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.NullableBitVector;
-import org.apache.drill.exec.vector.NullableFloat4Vector;
-import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.TypeHelper;
-import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.*;
 
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
@@ -51,377 +45,384 @@ import com.google.common.io.InputSupplier;
 import com.google.common.io.Resources;
 
 public class JSONRecordReader implements RecordReader {
-    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
-    private static final int DEFAULT_LENGTH = 256 * 1024; // 256kb
-    public static final Charset UTF_8 = Charset.forName("UTF-8");
-
-    private final String inputPath;
-
-    private final Map<Field, VectorHolder> valueVectorMap;
-
-    private JsonParser parser;
-    private SchemaIdGenerator generator;
-    private DiffSchema diffSchema;
-    private RecordSchema currentSchema;
-    private List<Field> removedFields;
-    private OutputMutator outputMutator;
-    private BufferAllocator allocator;
-    private int batchSize;
-
-    public JSONRecordReader(FragmentContext fragmentContext, String inputPath, int batchSize) {
-        this.inputPath = inputPath;
-        this.allocator = fragmentContext.getAllocator();
-        this.batchSize = batchSize;
-        valueVectorMap = Maps.newHashMap();
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
+  private static final int DEFAULT_LENGTH = 256 * 1024; // 256kb
+  public static final Charset UTF_8 = Charset.forName("UTF-8");
+
+  private final String inputPath;
+
+  private final Map<Field, VectorHolder> valueVectorMap;
+
+  private JsonParser parser;
+  private SchemaIdGenerator generator;
+  private DiffSchema diffSchema;
+  private RecordSchema currentSchema;
+  private List<Field> removedFields;
+  private OutputMutator outputMutator;
+  private BufferAllocator allocator;
+  private int batchSize;
+
+  public JSONRecordReader(FragmentContext fragmentContext, String inputPath, int batchSize) {
+    this.inputPath = inputPath;
+    this.allocator = fragmentContext.getAllocator();
+    this.batchSize = batchSize;
+    valueVectorMap = Maps.newHashMap();
+  }
+
+  public JSONRecordReader(FragmentContext fragmentContext, String inputPath) {
+    this(fragmentContext, inputPath, DEFAULT_LENGTH);
+  }
+
+  private JsonParser getParser() {
+    return parser;
+  }
+
+  @Override
+  public void setup(OutputMutator output) throws ExecutionSetupException {
+    outputMutator = output;
+    currentSchema = new ObjectSchema();
+    diffSchema = new DiffSchema();
+    removedFields = Lists.newArrayList();
+
+    try {
+      InputSupplier<InputStreamReader> input;
+      if (inputPath.startsWith("resource:")) {
+        input = Resources.newReaderSupplier(Resources.getResource(inputPath.substring(9)), Charsets.UTF_8);
+      } else {
+        input = Files.newReaderSupplier(new File(inputPath), Charsets.UTF_8);
+      }
+
+      JsonFactory factory = new JsonFactory();
+      parser = factory.createJsonParser(input.getInput());
+      parser.nextToken(); // Read to the first START_OBJECT token
+      generator = new SchemaIdGenerator();
+    } catch (IOException e) {
+      throw new ExecutionSetupException(e);
     }
+  }
 
-    public JSONRecordReader(FragmentContext fragmentContext, String inputPath) {
-        this(fragmentContext, inputPath, DEFAULT_LENGTH);
+  @Override
+  public int next() {
+    if (parser.isClosed() || !parser.hasCurrentToken()) {
+      return 0;
     }
 
-    private JsonParser getParser() {
-        return parser;
-    }
+    resetBatch();
 
-    @Override
-    public void setup(OutputMutator output) throws ExecutionSetupException {
-        outputMutator = output;
-        currentSchema = new ObjectSchema();
-        diffSchema = new DiffSchema();
-        removedFields = Lists.newArrayList();
-
-        try {
-            InputSupplier<InputStreamReader> input;
-            if (inputPath.startsWith("resource:")) {
-                input = Resources.newReaderSupplier(Resources.getResource(inputPath.substring(9)), Charsets.UTF_8);
-            } else {
-                input = Files.newReaderSupplier(new File(inputPath), Charsets.UTF_8);
-            }
-
-            JsonFactory factory = new JsonFactory();
-            parser = factory.createJsonParser(input.getInput());
-            parser.nextToken(); // Read to the first START_OBJECT token
-            generator = new SchemaIdGenerator();
-        } catch (IOException e) {
-            throw new ExecutionSetupException(e);
-        }
-    }
-
-    @Override
-    public int next() {
-        if (parser.isClosed() || !parser.hasCurrentToken()) {
-            return 0;
-        }
+    int nextRowIndex = 0;
 
-        resetBatch();
+    try {
+      while (ReadType.OBJECT.readRecord(null, this, null, nextRowIndex++)) {
+        parser.nextToken(); // Read to START_OBJECT token
 
-        int nextRowIndex = 0;
-
-        try {
-            while (ReadType.OBJECT.readRecord(null, this, null, nextRowIndex++)) {
-                parser.nextToken(); // Read to START_OBJECT token
-
-                if (!parser.hasCurrentToken()) {
-                    parser.close();
-                    break;
-                }
-            }
+        if (!parser.hasCurrentToken()) {
+          parser.close();
+          break;
+        }
+      }
 
-            parser.nextToken();
+      parser.nextToken();
 
-            if (!parser.hasCurrentToken()) {
-                parser.close();
-            }
+      if (!parser.hasCurrentToken()) {
+        parser.close();
+      }
 
-            // Garbage collect fields never referenced in this batch
-            for (Field field : Iterables.concat(currentSchema.removeUnreadFields(), removedFields)) {
-                diffSchema.addRemovedField(field);
-                outputMutator.removeField(field.getAsMaterializedField());
-            }
+      // Garbage collect fields never referenced in this batch
+      for (Field field : Iterables.concat(currentSchema.removeUnreadFields(), removedFields)) {
+        diffSchema.addRemovedField(field);
+        outputMutator.removeField(field.getAsMaterializedField());
+      }
 
-        } catch (IOException | SchemaChangeException e) {
-            logger.error("Error reading next in Json reader", e);
-        }
-        return nextRowIndex;
+    } catch (IOException | SchemaChangeException e) {
+      logger.error("Error reading next in Json reader", e);
     }
+    return nextRowIndex;
+  }
 
-    private void resetBatch() {
-        for (VectorHolder value : valueVectorMap.values()) {
-            value.reset();
-        }
-
-        currentSchema.resetMarkedFields();
-        diffSchema.reset();
-        removedFields.clear();
+  private void resetBatch() {
+    for (VectorHolder value : valueVectorMap.values()) {
+      value.reset();
     }
 
-    @Override
-    public void cleanup() {
-        try {
-            parser.close();
-        } catch (IOException e) {
-            logger.warn("Error closing Json parser", e);
-        }
+    currentSchema.resetMarkedFields();
+    diffSchema.reset();
+    removedFields.clear();
+  }
+
+  @Override
+  public void cleanup() {
+    try {
+      parser.close();
+    } catch (IOException e) {
+      logger.warn("Error closing Json parser", e);
     }
-
-
-    private RecordSchema getCurrentSchema() {
-        return currentSchema;
+  }
+
+
+  private RecordSchema getCurrentSchema() {
+    return currentSchema;
+  }
+
+  private void setCurrentSchema(RecordSchema schema) {
+    currentSchema = schema;
+  }
+
+  private List<Field> getRemovedFields() {
+    return removedFields;
+  }
+
+  private DiffSchema getDiffSchema() {
+    return diffSchema;
+  }
+
+  public BufferAllocator getAllocator() {
+    return allocator;
+  }
+
+  public OutputMutator getOutputMutator() {
+    return outputMutator;
+  }
+
+  public static enum ReadType {
+    ARRAY(END_ARRAY) {
+      @Override
+      public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) {
+        return new OrderedField(parentSchema, fieldType, prefixFieldName, index);
+      }
+
+      @Override
+      public RecordSchema createSchema() throws IOException {
+        return new ListSchema();
+      }
+    },
+    OBJECT(END_OBJECT) {
+      @Override
+      public Field createField(RecordSchema parentSchema,
+                               String prefixFieldName,
+                               String fieldName,
+                               MajorType fieldType,
+                               int index) {
+        return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
+      }
+
+      @Override
+      public RecordSchema createSchema() throws IOException {
+        return new ObjectSchema();
+      }
+    };
+
+    private final JsonToken endObject;
+
+    ReadType(JsonToken endObject) {
+      this.endObject = endObject;
     }
 
-    private void setCurrentSchema(RecordSchema schema) {
-        currentSchema = schema;
+    public JsonToken getEndObject() {
+      return endObject;
     }
 
-    private List<Field> getRemovedFields() {
-        return removedFields;
-    }
+    public boolean readRecord(Field parentField,
+                              JSONRecordReader reader,
+                              String prefixFieldName,
+                              int rowIndex) throws IOException, SchemaChangeException {
+      JsonParser parser = reader.getParser();
+      JsonToken token = parser.nextToken();
+      JsonToken endObject = getEndObject();
+      int colIndex = 0;
+      boolean isFull = false;
+      while (token != endObject) {
+        if (token == FIELD_NAME) {
+          token = parser.nextToken();
+          continue;
+        }
 
-    private DiffSchema getDiffSchema() {
-        return diffSchema;
-    }
+        String fieldName = parser.getCurrentName();
+        MajorType fieldType = JacksonHelper.getFieldType(token);
+        ReadType readType = null;
+        switch (token) {
+          case START_ARRAY:
+            readType = ReadType.ARRAY;
+            break;
+          case START_OBJECT:
+            readType = ReadType.OBJECT;
+            break;
+        }
+        if (fieldType != null) { // Including nulls
+          boolean currentFieldNotFull = recordData(
+              parentField,
+              readType,
+              reader,
+              fieldType,
+              prefixFieldName,
+              fieldName,
+              rowIndex, colIndex);
+
+          isFull = isFull || !currentFieldNotFull;
 
-    public BufferAllocator getAllocator() {
-        return allocator;
+        }
+        token = parser.nextToken();
+        colIndex += 1;
+      }
+      return !isFull;
     }
 
-    public OutputMutator getOutputMutator() {
-        return outputMutator;
+    private void removeChildFields(List<Field> removedFields, Field field) {
+      RecordSchema schema = field.getAssignedSchema();
+      if (schema == null) {
+        return;
+      }
+      for (Field childField : schema.getFields()) {
+        removedFields.add(childField);
+        if (childField.hasSchema()) {
+          removeChildFields(removedFields, childField);
+        }
+      }
     }
 
-    public static enum ReadType {
-        ARRAY(END_ARRAY) {
-            @Override
-            public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) {
-                return new OrderedField(parentSchema, fieldType, prefixFieldName, index);
-            }
-
-            @Override
-            public RecordSchema createSchema() throws IOException {
-                return new ListSchema();
-            }
-        },
-        OBJECT(END_OBJECT) {
-            @Override
-            public Field createField(RecordSchema parentSchema,
-                                     String prefixFieldName,
-                                     String fieldName,
-                                     MajorType fieldType,
-                                     int index) {
-                return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
-            }
-
-            @Override
-            public RecordSchema createSchema() throws IOException {
-                return new ObjectSchema();
-            }
-        };
-
-        private final JsonToken endObject;
-
-        ReadType(JsonToken endObject) {
-            this.endObject = endObject;
+    private boolean recordData(Field parentField,
+                               JSONRecordReader.ReadType readType,
+                               JSONRecordReader reader,
+                               MajorType fieldType,
+                               String prefixFieldName,
+                               String fieldName,
+                               int rowIndex,
+                               int colIndex) throws IOException, SchemaChangeException {
+      RecordSchema currentSchema = reader.getCurrentSchema();
+      Field field = currentSchema.getField(fieldName, colIndex);
+      boolean isFieldFound = field != null;
+      List<Field> removedFields = reader.getRemovedFields();
+      if (!isFieldFound || !field.getFieldType().equals(fieldType)) {
+        if (isFieldFound) {
+          if (field.hasSchema()) {
+            removeChildFields(removedFields, field);
+          }
+          removedFields.add(field);
+          currentSchema.removeField(field, colIndex);
         }
 
-        public JsonToken getEndObject() {
-            return endObject;
-        }
+        field = createField(
+            currentSchema,
+            prefixFieldName,
+            fieldName,
+            fieldType,
+            colIndex
+        );
+
+        reader.recordNewField(field);
+        currentSchema.addField(field);
+      }
+
+      field.setRead(true);
+
+      VectorHolder holder = getOrCreateVectorHolder(reader, field);
+      if (readType != null) {
+        RecordSchema fieldSchema = field.getAssignedSchema();
+        reader.setCurrentSchema(fieldSchema);
+
+        RecordSchema newSchema = readType.createSchema();
+        field.assignSchemaIfNull(newSchema);
+
+        if (fieldSchema == null) reader.setCurrentSchema(newSchema);
+        readType.readRecord(field, reader, field.getFullFieldName(), rowIndex);
+
+        reader.setCurrentSchema(currentSchema);
+      } else if (holder != null) {
+        return addValueToVector(
+            rowIndex,
+            holder,
+            reader.getAllocator(),
+            JacksonHelper.getValueFromFieldType(
+                reader.getParser(),
+                fieldType.getMinorType()
+            ),
+            fieldType.getMinorType()
+        );
+      }
+
+      return true;
+    }
 
-        public boolean readRecord(Field parentField,
-                                  JSONRecordReader reader,
-                                  String prefixFieldName,
-                                  int rowIndex) throws IOException, SchemaChangeException {
-            JsonParser parser = reader.getParser();
-            JsonToken token = parser.nextToken();
-            JsonToken endObject = getEndObject();
-            int colIndex = 0;
-            boolean isFull = false;
-            while (token != endObject) {
-                if (token == FIELD_NAME) {
-                    token = parser.nextToken();
-                    continue;
-                }
-
-                String fieldName = parser.getCurrentName();
-                MajorType fieldType = JacksonHelper.getFieldType(token);
-                ReadType readType = null;
-                switch (token) {
-                    case START_ARRAY:
-                        readType = ReadType.ARRAY;
-                        break;
-                    case START_OBJECT:
-                        readType = ReadType.OBJECT;
-                        break;
-                }
-                if (fieldType != null) { // Including nulls
-                    isFull = isFull ||
-                            !recordData(
-                                    parentField,
-                                    readType,
-                                    reader,
-                                    fieldType,
-                                    prefixFieldName,
-                                    fieldName,
-                                    rowIndex, colIndex);
-                }
-                token = parser.nextToken();
-                colIndex += 1;
-            }
-            return !isFull;
+    private static <T> boolean addValueToVector(int index, VectorHolder holder, BufferAllocator allocator, T val, MinorType minorType) {
+      switch (minorType) {
+        case INT: {
+          holder.incAndCheckLength(32 + 1);
+          NullableIntVector int4 = (NullableIntVector) holder.getValueVector();
+          NullableIntVector.Mutator m = int4.getMutator();
+          if (val != null) {
+            m.set(index, (Integer) val);
+          }
+          return holder.hasEnoughSpace(32 + 1);
         }
-
-        private void removeChildFields(List<Field> removedFields, Field field) {
-            RecordSchema schema = field.getAssignedSchema();
-            if (schema == null) {
-                return;
-            }
-            for (Field childField : schema.getFields()) {
-                removedFields.add(childField);
-                if (childField.hasSchema()) {
-                    removeChildFields(removedFields, childField);
-                }
-            }
+        case FLOAT4: {
+          holder.incAndCheckLength(32 + 1);
+          NullableFloat4Vector float4 = (NullableFloat4Vector) holder.getValueVector();
+          NullableFloat4Vector.Mutator m = float4.getMutator();
+          if (val != null) {
+            m.set(index, (Float) val);
+          }
+          return holder.hasEnoughSpace(32 + 1);
         }
-
-        private boolean recordData(Field parentField,
-                                   JSONRecordReader.ReadType readType,
-                                   JSONRecordReader reader,
-                                   MajorType fieldType,
-                                   String prefixFieldName,
-                                   String fieldName,
-                                   int rowIndex,
-                                   int colIndex) throws IOException, SchemaChangeException {
-            RecordSchema currentSchema = reader.getCurrentSchema();
-            Field field = currentSchema.getField(fieldName, colIndex);
-            boolean isFieldFound = field != null;
-            List<Field> removedFields = reader.getRemovedFields();
-            if (!isFieldFound || !field.getFieldType().equals(fieldType)) {
-                if (isFieldFound) {
-                    if (field.hasSchema()) {
-                        removeChildFields(removedFields, field);
-                    }
-                    removedFields.add(field);
-                    currentSchema.removeField(field, colIndex);
-                }
-
-                field = createField(
-                        currentSchema,
-                        prefixFieldName,
-                        fieldName,
-                        fieldType,
-                        colIndex
-                );
-
-                reader.recordNewField(field);
-                currentSchema.addField(field);
-            }
-
-            field.setRead(true);
-
-            VectorHolder holder = getOrCreateVectorHolder(reader, field);
-            if (readType != null) {
-                RecordSchema fieldSchema = field.getAssignedSchema();
-                reader.setCurrentSchema(fieldSchema);
-
-                RecordSchema newSchema = readType.createSchema();
-                field.assignSchemaIfNull(newSchema);
-
-                if (fieldSchema == null) reader.setCurrentSchema(newSchema);
-                readType.readRecord(field, reader, field.getFullFieldName(), rowIndex);
-
-                reader.setCurrentSchema(currentSchema);
-            } else {
-                return addValueToVector(
-                        rowIndex,
-                        holder,
-                        reader.getAllocator(),
-                        JacksonHelper.getValueFromFieldType(
-                                reader.getParser(),
-                                fieldType.getMinorType()
-                        ),
-                        fieldType.getMinorType()
-                );
-            }
-
-            return true;
+        case VARCHAR: {
+          if (val == null) {
+            return (index + 1) * 4 <= holder.getLength();
+          } else {
+            byte[] bytes = ((String) val).getBytes(UTF_8);
+            int length = bytes.length;
+            holder.incAndCheckLength(length);
+            NullableVarCharVector varLen4 = (NullableVarCharVector) holder.getValueVector();
+            NullableVarCharVector.Mutator m = varLen4.getMutator();
+            m.set(index, bytes);
+            return holder.hasEnoughSpace(length + 4 + 1);
+          }
         }
-
-        private static <T> boolean addValueToVector(int index, VectorHolder holder, BufferAllocator allocator, T val, MinorType minorType) {
-            switch (minorType) {
-                case INT: {
-                    holder.incAndCheckLength(32);
-                    NullableIntVector int4 = (NullableIntVector) holder.getValueVector();
-                    NullableIntVector.Mutator m = int4.getMutator();
-                    if (val != null) {
-                      m.set(index, (Integer) val);
-                    }
-                    return holder.hasEnoughSpace(32);
-                }
-                case FLOAT4: {
-                    holder.incAndCheckLength(32);
-                    NullableFloat4Vector float4 = (NullableFloat4Vector) holder.getValueVector();
-                    NullableFloat4Vector.Mutator m = float4.getMutator();
-                    if (val != null) {
-                      m.set(index, (Float) val);
-                    }
-                    return holder.hasEnoughSpace(32);
-                }
-                case VARCHAR: {
-                    if (val == null) {
-                        return (index + 1) * 4 <= holder.getLength();
-                    } else {
-                        byte[] bytes = ((String) val).getBytes(UTF_8);
-                        int length = bytes.length;
-                        holder.incAndCheckLength(length);
-                        NullableVarCharVector varLen4 = (NullableVarCharVector) holder.getValueVector();
-                        NullableVarCharVector.Mutator m = varLen4.getMutator();
-                        m.set(index, bytes);
-                        return holder.hasEnoughSpace(length);
-                    }
-                }
-                case BIT: {
-                    holder.incAndCheckLength(1);
-                    NullableBitVector bit = (NullableBitVector) holder.getValueVector();
-                    if (val != null) {
-                        bit.getMutator().set(index, (Boolean)val ? 1 : 0);
-                    }
-                    return holder.hasEnoughSpace(1);
-                }
-                default:
-                    throw new DrillRuntimeException("Type not supported to add value. Type: " + minorType);
-            }
+        case BIT: {
+          holder.incAndCheckLength(1);
+          NullableBitVector bit = (NullableBitVector) holder.getValueVector();
+          if (val != null) {
+            bit.getMutator().set(index, (Boolean) val ? 1 : 0);
+          }
+          return holder.hasEnoughSpace(1 + 1);
         }
+        default:
+          throw new DrillRuntimeException("Type not supported to add value. Type: " + minorType);
+      }
+    }
 
-        private VectorHolder getOrCreateVectorHolder(JSONRecordReader reader, Field field) throws SchemaChangeException {
-            return reader.getOrCreateVectorHolder(field);
-        }
+    private VectorHolder getOrCreateVectorHolder(JSONRecordReader reader, Field field) throws SchemaChangeException {
+      return reader.getOrCreateVectorHolder(field);
+    }
 
-        public abstract RecordSchema createSchema() throws IOException;
+    public abstract RecordSchema createSchema() throws IOException;
 
-        public abstract Field createField(RecordSchema parentSchema,
-                                          String prefixFieldName,
-                                          String fieldName,
-                                          MajorType fieldType,
-                                          int index);
-    }
+    public abstract Field createField(RecordSchema parentSchema,
+                                      String prefixFieldName,
+                                      String fieldName,
+                                      MajorType fieldType,
+                                      int index);
+  }
 
-    private void recordNewField(Field field) {
-        diffSchema.recordNewField(field);
-    }
+  private void recordNewField(Field field) {
+    diffSchema.recordNewField(field);
+  }
 
-    private VectorHolder getOrCreateVectorHolder(Field field) throws SchemaChangeException {
-      VectorHolder holder = valueVectorMap.get(field);
-      
-        if (holder == null) {
-            MajorType type = field.getFieldType();
-            MaterializedField f = MaterializedField.create(new SchemaPath(field.getFieldName(), ExpressionPosition.UNKNOWN), type);
-            ValueVector v = TypeHelper.getNewVector(f, allocator);
-            AllocationHelper.allocate(v, batchSize, 50);
-            holder = new VectorHolder(batchSize, v);
-            valueVectorMap.put(field, holder);
-            outputMutator.addField(v);
-            return holder;
-        }
-        return holder;
+  private VectorHolder getOrCreateVectorHolder(Field field) throws SchemaChangeException {
+    VectorHolder holder = valueVectorMap.get(field);
+
+    if (holder == null) {
+      MajorType type = field.getFieldType();
+      MaterializedField f = MaterializedField.create(new SchemaPath(field.getFullFieldName(), ExpressionPosition.UNKNOWN), type);
+
+      if (f.getType().getMinorType().equals(MinorType.MAP)) {
+        return null;
+      }
+
+      ValueVector v = TypeHelper.getNewVector(f, allocator);
+      AllocationHelper.allocate(v, batchSize, 50);
+      holder = new VectorHolder(batchSize, v);
+      valueVectorMap.put(field, holder);
+      outputMutator.addField(v);
+      return holder;
     }
+    return holder;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bff8ab65/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
index 661b029..0ebb529 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
@@ -8,6 +8,8 @@ import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.List;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
 import mockit.Expectations;
 import mockit.Injectable;
 
@@ -65,7 +67,11 @@ public class JSONRecordReaderTest {
     UserBitShared.FieldMetadata metadata = valueVector.getMetadata();
     SchemaDefProtos.FieldDef def = metadata.getDef();
     assertEquals(expectedMinorType, def.getMajorType().getMinorType());
-    assertEquals(name, def.getNameList().get(0).getName());
+    String[] parts = name.split("\\.");
+    assertEquals(parts.length, def.getNameList().size());
+    for(int i = 0; i < parts.length; ++i) {
+      assertEquals(parts[i], def.getName(i).getName());
+    }
 
     if (expectedMinorType == MinorType.MAP) {
       return;
@@ -144,7 +150,7 @@ public class JSONRecordReaderTest {
     assertEquals(0, jr.next());
   }
 
-  @Test @Ignore
+  @Test
   public void testChangedSchemaInTwoBatches(@Injectable final FragmentContext context) throws IOException,
       ExecutionSetupException {
     new Expectations() {
@@ -177,7 +183,7 @@ public class JSONRecordReaderTest {
     assertField(addFields.get(4), 0, MinorType.VARCHAR, "test2".getBytes(UTF_8), "str1");
     assertField(addFields.get(5), 0, MinorType.INT, 4, "d");
     assertEquals(1, removedFields.size());
-    //assertEquals(3, (int) removedFields.get(0));
+    assertEquals("c", removedFields.get(0).getName());
     removedFields.clear();
     assertEquals(1, jr.next());
     assertEquals(8, addFields.size()); // The reappearing of field 'c' is also included
@@ -187,12 +193,22 @@ public class JSONRecordReaderTest {
     assertField(addFields.get(6), 0, MinorType.FLOAT4, (float) 5.16, "c");
     assertField(addFields.get(7), 0, MinorType.VARCHAR, "test3".getBytes(UTF_8), "str2");
     assertEquals(2, removedFields.size());
-//    assertTrue(removedFields.contains(5));
-//    assertTrue(removedFields.contains(2));
+    Iterables.find(removedFields, new Predicate<MaterializedField>() {
+      @Override
+      public boolean apply(MaterializedField materializedField) {
+        return materializedField.getName().equals("str1");
+      }
+    });
+    Iterables.find(removedFields, new Predicate<MaterializedField>() {
+      @Override
+      public boolean apply(MaterializedField materializedField) {
+        return materializedField.getName().equals("b");
+      }
+    });
     assertEquals(0, jr.next());
   }
 
-  @Test @Ignore
+  @Test
   public void testNestedFieldInSameBatch(@Injectable final FragmentContext context) throws ExecutionSetupException {
     new Expectations() {
       {
@@ -207,61 +223,15 @@ public class JSONRecordReaderTest {
     List<ValueVector> addFields = mutator.getAddFields();
     jr.setup(mutator);
     assertEquals(2, jr.next());
-    assertEquals(5, addFields.size());
+    assertEquals(3, addFields.size());
     assertField(addFields.get(0), 0, MinorType.INT, 123, "test");
-    assertField(addFields.get(1), 0, MinorType.MAP, null, "a");
-    assertField(addFields.get(2), 0, MinorType.VARCHAR, "test".getBytes(UTF_8), "b");
-    assertField(addFields.get(3), 0, MinorType.MAP, null, "a");
-    assertField(addFields.get(4), 0, MinorType.BIT, true, "d");
+    assertField(addFields.get(1), 0, MinorType.VARCHAR, "test".getBytes(UTF_8), "a.b");
+    assertField(addFields.get(2), 0, MinorType.BIT, true, "a.a.d");
     assertField(addFields.get(0), 1, MinorType.INT, 1234, "test");
-    assertField(addFields.get(1), 1, MinorType.MAP, null, "a");
-    assertField(addFields.get(2), 1, MinorType.VARCHAR, "test2".getBytes(UTF_8), "b");
-    assertField(addFields.get(3), 1, MinorType.MAP, null, "a");
-    assertField(addFields.get(4), 1, MinorType.BIT, true, "d");
+    assertField(addFields.get(1), 1, MinorType.VARCHAR, "test2".getBytes(UTF_8), "a.b");
+    assertField(addFields.get(2), 1, MinorType.BIT, false, "a.a.d");
 
     assertEquals(0, jr.next());
     assertTrue(mutator.getRemovedFields().isEmpty());
   }
-
-  /*
-   * 
-   * @Test public void testScanJsonRemovedOneField() throws IOException { ScanJson sj = new
-   * ScanJson(getResource("scan_json_test_3.json")); PhysicalOperatorIterator iterator = sj.getIterator();
-   * expectSchemaChanged(iterator); DiffSchema diffSchema = expectSchemaChanged(iterator).getSchemaChanges();
-   * assertEquals(0, diffSchema.getAddedFields().size()); assertEquals(1, diffSchema.getRemovedFields().size());
-   * assertEquals(PhysicalOperatorIterator.NextOutcome.NONE_LEFT, iterator.next()); }
-   * 
-   * @Test public void testScanJsonAddOneRemoveOne() throws IOException { ScanJson sj = new
-   * ScanJson(getResource("scan_json_test_4.json")); PhysicalOperatorIterator iterator = sj.getIterator();
-   * expectSchemaChanged(iterator); DiffSchema diffSchema = expectSchemaChanged(iterator).getSchemaChanges();
-   * assertEquals(1, diffSchema.getAddedFields().size()); assertEquals(1, diffSchema.getRemovedFields().size());
-   * assertEquals(PhysicalOperatorIterator.NextOutcome.NONE_LEFT, iterator.next()); }
-   * 
-   * @Test public void testScanJsonCycleAdditions() throws IOException { ScanJson sj = new
-   * ScanJson(getResource("scan_json_test_5.json")); PhysicalOperatorIterator iterator = sj.getIterator();
-   * expectSchemaChanged(iterator); DiffSchema diffSchema = expectSchemaChanged(iterator).getSchemaChanges();
-   * assertEquals(1, diffSchema.getAddedFields().size()); assertEquals(1, diffSchema.getRemovedFields().size());
-   * diffSchema = expectSchemaChanged(iterator).getSchemaChanges(); assertEquals(1, diffSchema.getAddedFields().size());
-   * assertEquals(Field.FieldType.FLOAT, diffSchema.getAddedFields().get(0).getFieldType()); assertEquals("test2",
-   * diffSchema.getAddedFields().get(0).getFieldName()); assertEquals(1, diffSchema.getRemovedFields().size());
-   * assertEquals(Field.FieldType.BOOLEAN, diffSchema.getRemovedFields().get(0).getFieldType()); assertEquals("test3",
-   * diffSchema.getRemovedFields().get(0).getFieldName()); assertEquals(PhysicalOperatorIterator.NextOutcome.NONE_LEFT,
-   * iterator.next()); }
-   * 
-   * @Test public void testScanJsonModifiedOneFieldType() throws IOException { ScanJson sj = new
-   * ScanJson(getResource("scan_json_test_6.json")); PhysicalOperatorIterator iterator = sj.getIterator();
-   * expectSchemaChanged(iterator); DiffSchema diffSchema = expectSchemaChanged(iterator).getSchemaChanges();
-   * List<Field> addedFields = diffSchema.getAddedFields(); assertEquals(4, addedFields.size()); List<Field>
-   * removedFields = diffSchema.getRemovedFields(); assertEquals(4, removedFields.size()); assertFieldExists("test",
-   * Field.FieldType.STRING, addedFields); assertFieldExists("test2", Field.FieldType.BOOLEAN, addedFields);
-   * assertFieldExists("b", Field.FieldType.ARRAY, addedFields); assertFieldExists("[0]", Field.FieldType.INTEGER,
-   * addedFields); assertFieldExists("test", Field.FieldType.INTEGER, removedFields); assertFieldExists("test2",
-   * Field.FieldType.ARRAY, removedFields); assertFieldExists("b", Field.FieldType.INTEGER, removedFields);
-   * assertFieldExists("[0]", Field.FieldType.INTEGER, removedFields);
-   * assertEquals(PhysicalOperatorIterator.NextOutcome.NONE_LEFT, iterator.next()); }
-   * 
-   * private void expectSchemaChanged(PhysicalOperatorIterator iterator) throws IOException { }
-   * 
-   * private void expectDataRecord(PhysicalOperatorIterator iterator) throws IOException { }
-   */
 }