You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:44:40 UTC

[02/27] git commit: Repeated values in JsonRecordReader

Repeated values in JsonRecordReader


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/38ab96f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/38ab96f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/38ab96f3

Branch: refs/heads/master
Commit: 38ab96f335537c6bbdb6a4a64b1c6e13755172f6
Parents: 73fad99
Author: Timothy Chen <tn...@gmail.com>
Authored: Sat Aug 3 15:29:35 2013 -0700
Committer: Timothy Chen <tn...@gmail.com>
Committed: Thu Aug 8 23:36:41 2013 -0700

----------------------------------------------------------------------
 .../drill/common/expression/SchemaPath.java     |   2 +-
 .../templates/RepeatedValueVectors.java         |  10 +-
 .../exec/schema/json/jackson/JacksonHelper.java | 116 +++++++++++--------
 .../drill/exec/store/JSONRecordReader.java      |  86 ++++++++------
 .../apache/drill/exec/store/VectorHolder.java   |  20 ++--
 .../drill/exec/vector/AllocationHelper.java     |   2 +-
 .../drill/exec/vector/RepeatedMutator.java      |  23 ----
 .../physical/impl/TestSimpleFragmentRun.java    |  81 ++++++++++++-
 .../drill/exec/store/JSONRecordReaderTest.java  |  56 ++++++++-
 .../src/test/resources/scan_json_test_4.json    |  23 ++--
 10 files changed, 279 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java
index 19d1069..6f1a733 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java
@@ -42,7 +42,7 @@ public class SchemaPath extends LogicalExpressionBase {
       ")*$";
 
   // reads well in RegexBuddy
-  private static final String SEGMENT_REGEX = "(?:\n" + "(\\[\\d+\\])\n" + "|\n" + "'?\n"
+  private static final String SEGMENT_REGEX = "(?:\n" + "\\[(\\d+)\\]\n" + "|\n" + "'?\n"
       + "([^\\.\\[\\+\\-\\!\\]\\}]+)  # identifier\n" + "'?\n" + ")\n"
       + "([\\+\\-\\!\\]\\}]?)         # collision type";
   private static final int GROUP_INDEX = 1;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
index c629a1d..f4a7049 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
@@ -1,4 +1,4 @@
-
+import org.apache.drill.exec.vector.ValueVector;
 
 <@pp.dropOutputFile />
 <#list types as type>
@@ -249,7 +249,7 @@ import com.google.common.collect.Lists;
     }
   }
   
-  public final class Mutator implements RepeatedMutator {
+  public final class Mutator implements ValueVector.Mutator {
 
     
     private Mutator(){
@@ -264,12 +264,18 @@ import com.google.common.collect.Lists;
      */
     public void add(int index, <#if type.major == "VarLen">byte[]<#elseif (type.width < 4)>int<#else>${minor.javaType!type.javaType}</#if> value) {
       int nextOffset = offsets.getAccessor().get(index+1);
+      if (index > 0 && nextOffset == 0) {
+        nextOffset = offsets.getAccessor().get(index);
+      }
       values.getMutator().set(nextOffset, value);
       offsets.getMutator().set(index+1, nextOffset+1);
     }
 
     public void add(int index, ${minor.class}Holder holder){
       int nextOffset = offsets.getAccessor().get(index+1);
+      if (index > 0 && nextOffset == 0) {
+        nextOffset = offsets.getAccessor().get(index);
+      }
       values.getMutator().set(nextOffset, holder);
       offsets.getMutator().set(index+1, nextOffset+1);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
index 0e2c052..d8f0646 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
@@ -18,63 +18,83 @@
 
 package org.apache.drill.exec.schema.json.jackson;
 
-import java.io.IOException;
-
-import org.apache.drill.common.types.Types;
-import org.apache.drill.common.types.TypeProtos.DataMode;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
 
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
+import java.io.IOException;
 
 public class JacksonHelper {
 
-    public static final MajorType STRING_TYPE = Types.optional(MinorType.VARCHAR);
-    public static final MajorType BOOLEAN_TYPE = Types.optional(MinorType.BIT);
-    public static final MajorType ARRAY_TYPE = Types.repeated(MinorType.LATE);
-    public static final MajorType MAP_TYPE = Types.repeated(MinorType.MAP);
-    public static final MajorType INT_TYPE = Types.optional(MinorType.INT);
-    public static final MajorType FLOAT_TYPE = Types.optional(MinorType.FLOAT4);
-    public static final MajorType NULL_TYPE = Types.optional(MinorType.LATE);
+  public static final MajorType STRING_TYPE = Types.optional(MinorType.VARCHAR);
+  public static final MajorType REPEATED_STRING_TYPE = Types.repeated(MinorType.VARCHAR);
+  public static final MajorType BOOLEAN_TYPE = Types.optional(MinorType.BIT);
+  public static final MajorType REPEATED_BOOLEAN_TYPE = Types.repeated(MinorType.BIT);
+  public static final MajorType ARRAY_TYPE = Types.repeated(MinorType.LATE);
+  public static final MajorType MAP_TYPE = Types.repeated(MinorType.MAP);
+  public static final MajorType INT_TYPE = Types.optional(MinorType.INT);
+  public static final MajorType REPEATED_INT_TYPE = Types.repeated(MinorType.INT);
+  public static final MajorType FLOAT_TYPE = Types.optional(MinorType.FLOAT4);
+  public static final MajorType REPEATED_FLOAT_TYPE = Types.repeated(MinorType.FLOAT4);
+  public static final MajorType NULL_TYPE = Types.optional(MinorType.LATE);
 
-    public static MajorType getFieldType(JsonToken token) {
-        switch(token) {
-            case VALUE_STRING:
-                return STRING_TYPE;
-            case VALUE_FALSE:
-                return BOOLEAN_TYPE;
-            case VALUE_TRUE:
-                return BOOLEAN_TYPE;
-            case START_ARRAY:
-                return ARRAY_TYPE;
-            case START_OBJECT:
-                return MAP_TYPE;
-            case VALUE_NUMBER_INT:
-                return INT_TYPE;
-            case VALUE_NUMBER_FLOAT:
-                return FLOAT_TYPE;
-            case VALUE_NULL:
-                return NULL_TYPE;
-        }
+  public static MajorType getFieldType(JsonToken token, boolean repeated) {
+    if (repeated) {
+      switch (token) {
+        case VALUE_STRING:
+          return REPEATED_STRING_TYPE;
+        case VALUE_FALSE:
+        case VALUE_TRUE:
+          return REPEATED_BOOLEAN_TYPE;
+        case START_ARRAY:
+          return ARRAY_TYPE;
+        case START_OBJECT:
+          return MAP_TYPE;
+        case VALUE_NUMBER_INT:
+          return REPEATED_INT_TYPE;
+        case VALUE_NUMBER_FLOAT:
+          return REPEATED_FLOAT_TYPE;
+      }
+    } else {
 
-        throw new UnsupportedOperationException("Unsupported Jackson type: " + token);
+      switch (token) {
+        case VALUE_STRING:
+          return STRING_TYPE;
+        case VALUE_FALSE:
+        case VALUE_TRUE:
+          return BOOLEAN_TYPE;
+        case START_ARRAY:
+          return ARRAY_TYPE;
+        case START_OBJECT:
+          return MAP_TYPE;
+        case VALUE_NUMBER_INT:
+          return INT_TYPE;
+        case VALUE_NUMBER_FLOAT:
+          return FLOAT_TYPE;
+        case VALUE_NULL:
+          return NULL_TYPE;
+      }
     }
 
-    public static Object getValueFromFieldType(JsonParser parser, MinorType fieldType) throws IOException {
-        switch (fieldType) {
-            case INT:
-                return parser.getIntValue();
-            case VARCHAR:
-                return parser.getValueAsString();
-            case FLOAT4:
-                return parser.getFloatValue();
-            case BIT:
-                return parser.getBooleanValue();
-            case LATE:
-                return null;
-            default:
-                throw new RuntimeException("Unexpected Field type to return value: " + fieldType.toString());
-        }
+    throw new UnsupportedOperationException("Unsupported Jackson type: " + token + ", Repeated: " + repeated);
+  }
+
+  public static Object getValueFromFieldType(JsonParser parser, MinorType fieldType) throws IOException {
+    switch (fieldType) {
+      case INT:
+        return parser.getIntValue();
+      case VARCHAR:
+        return parser.getValueAsString();
+      case FLOAT4:
+        return parser.getFloatValue();
+      case BIT:
+        return parser.getBooleanValue();
+      case LATE:
+        return null;
+      default:
+        throw new RuntimeException("Unexpected Field type to return value: " + fieldType.toString());
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
index ff7d315..a4887c0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
@@ -103,7 +103,7 @@ public class JSONRecordReader implements RecordReader {
     int nextRowIndex = 0;
 
     try {
-      while (ReadType.OBJECT.readRecord(null, this, null, nextRowIndex++, 0)) {
+      while (ReadType.OBJECT.readRecord(this, null, nextRowIndex++, 0)) {
         parser.nextToken(); // Read to START_OBJECT token
 
         if (!parser.hasCurrentToken()) {
@@ -180,12 +180,14 @@ public class JSONRecordReader implements RecordReader {
     ARRAY(END_ARRAY) {
       @Override
       public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) {
-        return new OrderedField(parentSchema, fieldType, prefixFieldName, index);
+        return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
+        //return new OrderedField(parentSchema, fieldType, prefixFieldName, index);
       }
 
       @Override
       public RecordSchema createSchema() throws IOException {
-        return new ListSchema();
+        return new ObjectSchema();
+        //return new ListSchema();
       }
     },
     OBJECT(END_OBJECT) {
@@ -215,8 +217,7 @@ public class JSONRecordReader implements RecordReader {
     }
 
     @SuppressWarnings("ConstantConditions")
-    public boolean readRecord(Field parentField,
-                              JSONRecordReader reader,
+    public boolean readRecord(JSONRecordReader reader,
                               String prefixFieldName,
                               int rowIndex,
                               int groupCount) throws IOException, SchemaChangeException {
@@ -232,7 +233,7 @@ public class JSONRecordReader implements RecordReader {
         }
 
         String fieldName = parser.getCurrentName();
-        MajorType fieldType = JacksonHelper.getFieldType(token);
+        MajorType fieldType = JacksonHelper.getFieldType(token, this == ReadType.ARRAY);
         ReadType readType = null;
         switch (token) {
           case START_ARRAY:
@@ -246,17 +247,17 @@ public class JSONRecordReader implements RecordReader {
         }
 
         if (fieldType != null) { // Including nulls
-          isFull = isFull ||
-              !recordData(
-                  parentField,
-                  readType,
-                  reader,
-                  fieldType,
-                  prefixFieldName,
-                  fieldName,
-                  rowIndex,
-                  colIndex,
-                  groupCount);
+          boolean currentFieldFull = !recordData(
+              readType,
+              reader,
+              fieldType,
+              prefixFieldName,
+              fieldName,
+              rowIndex,
+              colIndex,
+              groupCount);
+
+          isFull = isFull || currentFieldFull;
         }
         token = parser.nextToken();
         colIndex += 1;
@@ -277,8 +278,7 @@ public class JSONRecordReader implements RecordReader {
       }
     }
 
-    private boolean recordData(Field parentField,
-                               JSONRecordReader.ReadType readType,
+    private boolean recordData(JSONRecordReader.ReadType readType,
                                JSONRecordReader reader,
                                MajorType fieldType,
                                String prefixFieldName,
@@ -322,18 +322,13 @@ public class JSONRecordReader implements RecordReader {
         field.assignSchemaIfNull(newSchema);
 
         if (fieldSchema == null) reader.setCurrentSchema(newSchema);
-        if(readType == ReadType.ARRAY) {
-          readType.readRecord(field, reader, field.getFullFieldName(), rowIndex, groupCount);
-        } else {
-          readType.readRecord(field, reader, field.getFullFieldName(), rowIndex, groupCount);
-        }
+        readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
 
         reader.setCurrentSchema(currentSchema);
       } else {
         return addValueToVector(
             rowIndex,
             holder,
-            reader.getAllocator(),
             JacksonHelper.getValueFromFieldType(
                 reader.getParser(),
                 fieldType.getMinorType()
@@ -346,10 +341,10 @@ public class JSONRecordReader implements RecordReader {
       return true;
     }
 
-    private static <T> boolean addValueToVector(int index, VectorHolder holder, BufferAllocator allocator, T val, MinorType minorType, int groupCount) {
+    private static <T> boolean addValueToVector(int index, VectorHolder holder, T val, MinorType minorType, int groupCount) {
       switch (minorType) {
         case INT: {
-          holder.incAndCheckLength(32);
+          holder.incAndCheckLength(32 + 1);
           if (groupCount == 0) {
             if (val != null) {
               NullableIntVector int4 = (NullableIntVector) holder.getValueVector();
@@ -363,13 +358,14 @@ public class JSONRecordReader implements RecordReader {
 
             RepeatedIntVector repeatedInt4 = (RepeatedIntVector) holder.getValueVector();
             RepeatedIntVector.Mutator m = repeatedInt4.getMutator();
+            holder.setGroupCount(index);
             m.add(index, (Integer) val);
           }
 
-          return holder.hasEnoughSpace(32);
+          return holder.hasEnoughSpace(32 + 1);
         }
         case FLOAT4: {
-          holder.incAndCheckLength(32);
+          holder.incAndCheckLength(32 + 1);
           if (groupCount == 0) {
             if (val != null) {
               NullableFloat4Vector float4 = (NullableFloat4Vector) holder.getValueVector();
@@ -383,9 +379,10 @@ public class JSONRecordReader implements RecordReader {
 
             RepeatedFloat4Vector repeatedFloat4 = (RepeatedFloat4Vector) holder.getValueVector();
             RepeatedFloat4Vector.Mutator m = repeatedFloat4.getMutator();
-            m.add(groupCount, (Float) val);
+            holder.setGroupCount(index);
+            m.add(index, (Float) val);
           }
-          return holder.hasEnoughSpace(32);
+          return holder.hasEnoughSpace(32 + 1);
         }
         case VARCHAR: {
           if (val == null) {
@@ -401,16 +398,29 @@ public class JSONRecordReader implements RecordReader {
             } else {
               RepeatedVarCharVector repeatedVarLen4 = (RepeatedVarCharVector) holder.getValueVector();
               RepeatedVarCharVector.Mutator m = repeatedVarLen4.getMutator();
+              holder.setGroupCount(index);
               m.add(index, bytes);
             }
-            return holder.hasEnoughSpace(length);
+            return holder.hasEnoughSpace(length + 4 + 1);
           }
         }
         case BIT: {
-          holder.incAndCheckLength(1);
-          NullableBitVector bit = (NullableBitVector) holder.getValueVector();
-          if (val != null) {
-            bit.getMutator().set(index, (Boolean) val ? 1 : 0);
+          holder.incAndCheckLength(1 + 1);
+          if (groupCount == 0) {
+            if (val != null) {
+              NullableBitVector bit = (NullableBitVector) holder.getValueVector();
+              NullableBitVector.Mutator m = bit.getMutator();
+              m.set(index, (Boolean) val ? 1 : 0);
+            }
+          } else {
+            if (val == null) {
+              throw new UnsupportedOperationException("Nullable repeated boolean is not supported.");
+            }
+
+            RepeatedBitVector repeatedBit = (RepeatedBitVector) holder.getValueVector();
+            RepeatedBitVector.Mutator m = repeatedBit.getMutator();
+            holder.setGroupCount(index);
+            m.add(index, (Boolean) val ? 1 : 0);
           }
           return holder.hasEnoughSpace(1 + 1);
         }
@@ -443,7 +453,9 @@ public class JSONRecordReader implements RecordReader {
       MajorType type = field.getFieldType();
       MaterializedField f = MaterializedField.create(new SchemaPath(field.getFullFieldName(), ExpressionPosition.UNKNOWN), type);
 
-      if (f.getType().getMinorType().equals(MinorType.MAP)) {
+      MinorType minorType = f.getType().getMinorType();
+
+      if (minorType.equals(MinorType.MAP) || minorType.equals(MinorType.LATE)) {
         return null;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
index 43d3cd9..2c28082 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
@@ -64,22 +64,18 @@ public class VectorHolder {
 
   public void populateVectorLength() {
     ValueVector.Mutator mutator = vector.getMutator();
-    if(mutator instanceof NonRepeatedMutator) {
-      ((NonRepeatedMutator)mutator).setValueCount(count);
-    } else if(mutator instanceof RepeatedMutator) {
-      ((RepeatedMutator)mutator).setGroupAndValueCount(groupCount, count);
+    if(vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) {
+      mutator.setValueCount(groupCount);
     } else {
-      throw new UnsupportedOperationException("Mutator not supported: " + mutator.getClass().getName());
+      mutator.setValueCount(count);
     }
   }
 
   public void allocateNew(int valueLength) {
-    if (vector instanceof FixedWidthVector) {
-      ((FixedWidthVector) vector).allocateNew(valueLength);
-    } else if (vector instanceof VariableWidthVector) {
-      ((VariableWidthVector) vector).allocateNew(valueLength * 10, valueLength);
-    } else {
-      throw new UnsupportedOperationException();
-    }
+    AllocationHelper.allocate(vector, valueLength, 10, 5);
+  }
+
+  public void allocateNew(int valueLength, int repeatedPerTop) {
+    AllocationHelper.allocate(vector, valueLength, 10, repeatedPerTop);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
index 69c17f4..5007dbd 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
@@ -10,7 +10,7 @@ public class AllocationHelper {
   public static void allocate(ValueVector v, int valueCount, int bytesPerValue, int repeatedPerTop){
     if(v instanceof FixedWidthVector){
       ((FixedWidthVector) v).allocateNew(valueCount);
-    }else if(v instanceof VariableWidthVector){
+    } else if (v instanceof VariableWidthVector) {
       ((VariableWidthVector) v).allocateNew(valueCount * bytesPerValue, valueCount);
     }else if(v instanceof RepeatedFixedWidthVector){
       ((RepeatedFixedWidthVector) v).allocateNew(valueCount, valueCount * repeatedPerTop);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
deleted file mode 100644
index 1227d02..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-
-package org.apache.drill.exec.vector;
-
-public interface RepeatedMutator extends ValueVector.Mutator {
-  public void setGroupAndValueCount(int groupCount, int valueCount);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
index 5d4e700..cabe9b3 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
@@ -87,7 +87,6 @@ public class TestSimpleFragmentRun extends PopUnitTestBase {
         System.out.println();
       }
 
-
       for (int i = 0; i < batchLoader.getRecordCount(); i++) {
         boolean first = true;
         recordCount++;
@@ -101,13 +100,87 @@ public class TestSimpleFragmentRun extends PopUnitTestBase {
         }
         if(!first) System.out.println();
       }
-    
-  
-
     }
     logger.debug("Received results {}", results);
     assertEquals(recordCount, 200);
     }
   }
 
+
+  @Test
+  public void runJSONScanPopFragment() throws Exception {
+    try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+         Drillbit bit = new Drillbit(CONFIG, serviceSet);
+         DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) {
+
+      // run query.
+      bit.run();
+      client.connect();
+      List<QueryResultBatch> results = client.runQuery(QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/physical_json_scan_test1.json"), Charsets.UTF_8));
+
+      // look at records
+      RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
+      int recordCount = 0;
+
+      int expectedBatchCount = 2;
+
+      assertEquals(expectedBatchCount, results.size());
+
+      for (int i = 0; i < results.size(); ++i) {
+        QueryResultBatch batch = results.get(i);
+        if (i == 0) {
+          assertTrue(batch.hasData());
+        } else {
+          assertFalse(batch.hasData());
+          return;
+        }
+
+        assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
+        boolean firstColumn = true;
+
+        // print headers.
+        System.out.println("\n\n========NEW SCHEMA=========\n\n");
+        for (VectorWrapper<?> v : batchLoader) {
+
+          if (firstColumn) {
+            firstColumn = false;
+          } else {
+            System.out.print("\t");
+          }
+          System.out.print(v.getField().getName());
+          System.out.print("[");
+          System.out.print(v.getField().getType().getMinorType());
+          System.out.print("]");
+        }
+
+        System.out.println();
+
+
+        for (int r = 0; i < batchLoader.getRecordCount(); r++) {
+          boolean first = true;
+          recordCount++;
+          for (VectorWrapper<?> v : batchLoader) {
+            if (first) {
+              first = false;
+            } else {
+              System.out.print("\t");
+            }
+
+            ValueVector.Accessor accessor = v.getValueVector().getAccessor();
+
+            if (v.getField().getType().getMinorType() == TypeProtos.MinorType.VARCHAR) {
+              System.out.println(new String((byte[]) accessor.getObject(r), UTF_8));
+            } else {
+              System.out.print(accessor.getObject(r));
+            }
+          }
+          if (!first) System.out.println();
+        }
+
+      }
+
+      assertEquals(2, recordCount);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
index 0ebb529..b39ac8a 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
@@ -5,6 +5,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -68,7 +69,12 @@ public class JSONRecordReaderTest {
     SchemaDefProtos.FieldDef def = metadata.getDef();
     assertEquals(expectedMinorType, def.getMajorType().getMinorType());
     String[] parts = name.split("\\.");
-    assertEquals(parts.length, def.getNameList().size());
+    int expected = parts.length;
+    boolean expectingArray = List.class.isAssignableFrom(value.getClass());
+    if (expectingArray) {
+      expected += 1;
+    }
+    assertEquals(expected, def.getNameList().size());
     for(int i = 0; i < parts.length; ++i) {
       assertEquals(parts[i], def.getName(i).getName());
     }
@@ -78,10 +84,21 @@ public class JSONRecordReaderTest {
     }
 
     T val = (T) valueVector.getAccessor().getObject(index);
-    if (val instanceof byte[]) {
-      assertTrue(Arrays.equals((byte[]) value, (byte[]) val));
+    assertValue(value, val);
+  }
+
+  private void assertValue(Object expected, Object found) {
+    if (found instanceof byte[]) {
+      assertTrue(Arrays.equals((byte[]) expected, (byte[]) found));
+    } else if(found instanceof ArrayList) {
+      List expectedArray = (List) expected;
+      List foundArray = (List) found;
+      assertEquals(expectedArray.size(), foundArray.size());
+      for(int i = 0; i < expectedArray.size(); ++i) {
+        assertValue(expectedArray.get(i), foundArray.get(i));
+      }
     } else {
-      assertEquals(value, val);
+      assertEquals(expected, found);
     }
   }
 
@@ -234,4 +251,35 @@ public class JSONRecordReaderTest {
     assertEquals(0, jr.next());
     assertTrue(mutator.getRemovedFields().isEmpty());
   }
+
+  @Test
+  public void testRepeatedFields(@Injectable final FragmentContext context) throws ExecutionSetupException {
+    new Expectations() {
+      {
+        context.getAllocator();
+        returns(new DirectBufferAllocator());
+      }
+    };
+
+    JSONRecordReader jr = new JSONRecordReader(context, getResource("scan_json_test_4.json"));
+
+    MockOutputMutator mutator = new MockOutputMutator();
+    List<ValueVector> addFields = mutator.getAddFields();
+    jr.setup(mutator);
+    assertEquals(2, jr.next());
+    assertEquals(7, addFields.size());
+    assertField(addFields.get(0), 0, MinorType.INT, 123, "test");
+    assertField(addFields.get(1), 0, MinorType.INT, Arrays.asList(1, 2, 3), "test2");
+    assertField(addFields.get(2), 0, MinorType.INT, Arrays.asList(4, 5, 6), "test3.a");
+    assertField(addFields.get(3), 0, MinorType.INT, Arrays.asList(7, 8, 9), "test3.b");
+    assertField(addFields.get(4), 0, MinorType.INT, Arrays.asList(10, 11, 12), "test3.c.d");
+    assertField(addFields.get(5), 0, MinorType.FLOAT4, Arrays.<Float>asList((float) 1.1, (float) 1.2, (float) 1.3), "testFloat");
+    assertField(addFields.get(6), 0, MinorType.VARCHAR, Arrays.asList("hello".getBytes(UTF_8), "drill".getBytes(UTF_8)), "testStr");
+    assertField(addFields.get(1), 1, MinorType.INT, Arrays.asList(1, 2), "test2");
+    assertField(addFields.get(2), 1, MinorType.INT, Arrays.asList(7, 7, 7, 8), "test3.a");
+    assertField(addFields.get(5), 1, MinorType.FLOAT4, Arrays.<Float>asList((float) 2.2, (float) 2.3,(float) 2.4), "testFloat");
+
+    assertEquals(0, jr.next());
+    assertTrue(mutator.getRemovedFields().isEmpty());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_4.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_4.json b/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_4.json
index 0fb3202..fd003ac 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_4.json
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_4.json
@@ -1,14 +1,21 @@
 {
     "test": 123,
     "test2": [1,2,3],
-    "a": {
-    	 "b": 1
-    }
+    "test3": {
+        "a": [4,5,6],
+        "b": [7,8,9],
+        "c": {
+            "d": [10, 11, 12]
+        }
+    },
+    "testFloat": [1.1, 1.2, 1.3],
+    "testStr": ["hello", "drill"]
 }
 {
-    "test": 1234,
-    "test3": false,
-    "a": {
-    	 "b": 2
-    }
+    "test2": [1,2],
+    "test3": {
+        "a": [7,7,7,8],
+        "b": []
+    },
+    "testFloat": [2.2, 2.3, 2.4]
 }
\ No newline at end of file