You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:44:53 UTC

[15/27] git commit: Updates to add subscan support to JSON

Updates to add subscan support to JSON


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2884db7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2884db7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2884db7a

Branch: refs/heads/master
Commit: 2884db7a9162f9d348dbb88663e0da191a399c4d
Parents: 402be7e
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu Aug 15 17:57:24 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 15 17:57:24 2013 -0700

----------------------------------------------------------------------
 sandbox/prototype/exec/java-exec/pom.xml        |  15 +-
 .../apache/drill/exec/opt/BasicOptimizer.java   |   2 +-
 .../physical/config/JSONScanBatchCreator.java   |  46 --
 .../drill/exec/physical/config/JSONScanPOP.java | 114 -----
 .../exec/physical/config/MockGroupScanPOP.java  | 221 ---------
 .../exec/physical/config/MockRecordReader.java  | 118 -----
 .../physical/config/MockScanBatchCreator.java   |  46 --
 .../exec/physical/config/MockStorePOP.java      |  75 ---
 .../exec/physical/config/MockSubScanPOP.java    | 115 -----
 .../drill/exec/physical/impl/ImplCreator.java   |  56 +--
 .../drill/exec/store/JSONRecordReader.java      | 486 ------------------
 .../apache/drill/exec/store/VectorHolder.java   |  21 +-
 .../drill/exec/store/json/JSONGroupScan.java    | 145 ++++++
 .../drill/exec/store/json/JSONRecordReader.java | 489 +++++++++++++++++++
 .../exec/store/json/JSONScanBatchCreator.java   |  46 ++
 .../drill/exec/store/json/JSONSubScan.java      |  86 ++++
 .../drill/exec/store/mock/MockGroupScanPOP.java | 221 +++++++++
 .../drill/exec/store/mock/MockRecordReader.java | 118 +++++
 .../exec/store/mock/MockScanBatchCreator.java   |  46 ++
 .../exec/store/mock/MockStorageEngine.java      |   3 +-
 .../drill/exec/store/mock/MockStorePOP.java     |  75 +++
 .../drill/exec/store/mock/MockSubScanPOP.java   | 115 +++++
 .../drill/exec/store/parquet/ColumnReader.java  |   2 +-
 .../exec/store/parquet/PageReadStatus.java      |  15 +-
 .../exec/store/parquet/ParquetGroupScan.java    |   5 +-
 .../store/parquet/ParquetScanBatchCreator.java  |   5 +-
 .../drill/exec/store/JSONRecordReaderTest.java  |   2 +
 .../store/parquet/ParquetRecordReaderTest.java  |  33 +-
 .../src/test/resources/parquet_scan_screen.json |   4 +-
 .../apache/drill/jdbc/test/FullEngineTest.java  |   2 +
 30 files changed, 1416 insertions(+), 1311 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/pom.xml b/sandbox/prototype/exec/java-exec/pom.xml
index a2e8501..b36208c 100644
--- a/sandbox/prototype/exec/java-exec/pom.xml
+++ b/sandbox/prototype/exec/java-exec/pom.xml
@@ -46,12 +46,12 @@
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-column</artifactId>
-      <version>1.0.1-SNAPSHOT</version>
+      <version>1.0.1</version>
     </dependency>
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-hadoop</artifactId>
-      <version>1.0.1-SNAPSHOT</version>
+      <version>1.0.1</version>
     </dependency>
     <dependency>
       <groupId>com.yammer.metrics</groupId>
@@ -106,7 +106,7 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-core</artifactId>
-      <version>1.0.3-mapr-2.1.2.1</version>
+      <version>1.2.1</version>
       <exclusions>
         <exclusion>
           <artifactId>jets3t</artifactId>
@@ -291,12 +291,5 @@
   </build>
 
 
-  <repositories>
-    <repository>
-      <id>mapr-releases</id>
-      <url>http://repository.mapr.com/maven/</url>
-      <snapshots><enabled>false</enabled></snapshots>
-      <releases><enabled>true</enabled></releases>
-    </repository>
-  </repositories>
+
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index b5eea03..2c2a342 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -28,10 +28,10 @@ import org.apache.drill.exec.exception.SetupException;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.MockGroupScanPOP;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.store.StorageEngine;
+import org.apache.drill.exec.store.mock.MockGroupScanPOP;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanBatchCreator.java
deleted file mode 100644
index f93f03b..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanBatchCreator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-
-package org.apache.drill.exec.physical.config;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.ScanBatch;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.JSONRecordReader;
-import org.apache.drill.exec.store.RecordReader;
-
-import java.util.List;
-
-public class JSONScanBatchCreator implements BatchCreator<JSONScanPOP> {
-
-    @Override
-    public RecordBatch getBatch(FragmentContext context, JSONScanPOP config, List<RecordBatch> children) throws ExecutionSetupException {
-        Preconditions.checkArgument(children.isEmpty());
-        List<JSONScanPOP.ScanEntry> entries = config.getReadEntries();
-        List<RecordReader> readers = Lists.newArrayList();
-        for (JSONScanPOP.ScanEntry e : entries) {
-            readers.add(new JSONRecordReader(context, e.getUrl()));
-        }
-
-        return new ScanBatch(context, readers.iterator());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanPOP.java
deleted file mode 100644
index 1dcf5e1..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/JSONScanPOP.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-
-package org.apache.drill.exec.physical.config;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.ReadEntry;
-import org.apache.drill.exec.physical.base.AbstractScan;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.Scan;
-import org.apache.drill.exec.physical.base.Size;
-import org.apache.drill.exec.proto.CoordinationProtos;
-
-import java.io.File;
-import java.net.URI;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-@JsonTypeName("json-scan")
-public class JSONScanPOP extends AbstractScan<JSONScanPOP.ScanEntry> {
-    private static int ESTIMATED_RECORD_SIZE = 1024; // 1kb
-
-    private LinkedList[] mappings;
-
-    @JsonCreator
-    public JSONScanPOP(@JsonProperty("entries") List<JSONScanPOP.ScanEntry> readEntries) {
-        super(readEntries);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) {
-        checkArgument(endpoints.size() <= getReadEntries().size());
-
-        mappings = new LinkedList[endpoints.size()];
-
-        int i = 0;
-        for (ScanEntry e : this.getReadEntries()) {
-            if (i == endpoints.size()) i = 0;
-            LinkedList entries = mappings[i];
-            if (entries == null) {
-                entries = new LinkedList<>();
-                mappings[i] = entries;
-            }
-            entries.add(e);
-            i++;
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public Scan<?> getSpecificScan(int minorFragmentId) {
-        checkArgument(minorFragmentId < mappings.length, "Mappings length [%s] should be longer than minor fragment id [%s] but it isn't.", mappings.length, minorFragmentId);
-        return new JSONScanPOP(mappings[minorFragmentId]);
-    }
-
-    @Override
-    public List<EndpointAffinity> getOperatorAffinity() {
-        return Collections.emptyList();
-    }
-
-    @Override
-    public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
-        return new JSONScanPOP(readEntries);
-    }
-
-    public static class ScanEntry implements ReadEntry {
-        private final String url;
-        private Size size;
-
-        @JsonCreator
-        public ScanEntry(@JsonProperty("url") String url) {
-            this.url = url;
-            long fileLength = new File(URI.create(url)).length();
-            size = new Size(fileLength / ESTIMATED_RECORD_SIZE, ESTIMATED_RECORD_SIZE);
-        }
-
-        @Override
-        public OperatorCost getCost() {
-            return new OperatorCost(1, 1, 2, 2);
-        }
-
-        @Override
-        public Size getSize() {
-            return size;
-        }
-
-        public String getUrl() {
-            return url;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockGroupScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockGroupScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockGroupScanPOP.java
deleted file mode 100644
index a28c7d8..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockGroupScanPOP.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.physical.config;
-
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.ReadEntry;
-import org.apache.drill.exec.physical.base.*;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.vector.TypeHelper;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-
-@JsonTypeName("mock-scan")
-public class MockGroupScanPOP extends AbstractGroupScan {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockGroupScanPOP.class);
-
-  private final String url;
-  protected final List<MockScanEntry> readEntries;
-  private final OperatorCost cost;
-  private final Size size;
-  private  LinkedList<MockScanEntry>[] mappings;
-
-  @JsonCreator
-  public MockGroupScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockScanEntry> readEntries) {
-    this.readEntries = readEntries;
-    OperatorCost cost = new OperatorCost(0,0,0,0);
-    Size size = new Size(0,0);
-    for(MockScanEntry r : readEntries){
-      cost = cost.add(r.getCost());
-      size = size.add(r.getSize());
-    }
-    this.cost = cost;
-    this.size = size;
-    this.url = url;
-  }
-
-  public String getUrl() {
-    return url;
-  }
-
-  @JsonProperty("entries")
-  public List<MockScanEntry> getReadEntries() {
-    return readEntries;
-  }
-  
-  public static class MockScanEntry implements ReadEntry {
-
-    private final int records;
-    private final MockColumn[] types;
-    private final int recordSize;
-    
-
-    @JsonCreator
-    public MockScanEntry(@JsonProperty("records") int records, @JsonProperty("types") MockColumn[] types) {
-      this.records = records;
-      this.types = types;
-      int size = 0;
-      for(MockColumn dt : types){
-        size += TypeHelper.getSize(dt.getMajorType());
-      }
-      this.recordSize = size;
-    }
-
-    @Override
-    public OperatorCost getCost() {
-      return new OperatorCost(1, 2, 1, 1);
-    }
-    
-    public int getRecords() {
-      return records;
-    }
-
-    public MockColumn[] getTypes() {
-      return types;
-    }
-
-    @Override
-    public Size getSize() {
-      return new Size(records, recordSize);
-    }
-  }
-  
-  @JsonInclude(Include.NON_NULL)
-  public static class MockColumn{
-    @JsonProperty("type") public MinorType minorType;
-    public String name;
-    public DataMode mode;
-    public Integer width;
-    public Integer precision;
-    public Integer scale;
-    
-    
-    @JsonCreator
-    public MockColumn(@JsonProperty("name") String name, @JsonProperty("type") MinorType minorType, @JsonProperty("mode") DataMode mode, @JsonProperty("width") Integer width, @JsonProperty("precision") Integer precision, @JsonProperty("scale") Integer scale) {
-      this.name = name;
-      this.minorType = minorType;
-      this.mode = mode;
-      this.width = width;
-      this.precision = precision;
-      this.scale = scale;
-    }
-    
-    @JsonProperty("type")
-    public MinorType getMinorType() {
-      return minorType;
-    }
-    public String getName() {
-      return name;
-    }
-    public DataMode getMode() {
-      return mode;
-    }
-    public Integer getWidth() {
-      return width;
-    }
-    public Integer getPrecision() {
-      return precision;
-    }
-    public Integer getScale() {
-      return scale;
-    }
-    
-    @JsonIgnore
-    public MajorType getMajorType(){
-      MajorType.Builder b = MajorType.newBuilder();
-      b.setMode(mode);
-      b.setMinorType(minorType);
-      if(precision != null) b.setPrecision(precision);
-      if(width != null) b.setWidth(width);
-      if(scale != null) b.setScale(scale);
-      return b.build();
-    }
-    
-  }
-
-  @Override
-  public List<EndpointAffinity> getOperatorAffinity() {
-    return Collections.emptyList();
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
-    Preconditions.checkArgument(endpoints.size() <= getReadEntries().size());
-    
-    mappings = new LinkedList[endpoints.size()];
-
-    int i =0;
-    for(MockScanEntry e : this.getReadEntries()){
-      if(i == endpoints.size()) i -= endpoints.size();
-      LinkedList<MockScanEntry> entries = mappings[i];
-      if(entries == null){
-        entries = new LinkedList<MockScanEntry>();
-        mappings[i] = entries;
-      }
-      entries.add(e);
-      i++;
-    }
-  }
-
-  @Override
-  public SubScan getSpecificScan(int minorFragmentId) {
-    assert minorFragmentId < mappings.length : String.format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.length, minorFragmentId);
-    return new MockSubScanPOP(url, mappings[minorFragmentId]);
-  }
-
-  @Override
-  public int getMaxParallelizationWidth() {
-    return readEntries.size();
-  }
-
-  @Override
-  public OperatorCost getCost() {
-    return cost;
-  }
-
-  @Override
-  public Size getSize() {
-    return size;
-  }
-
-  @Override
-  @JsonIgnore
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
-    Preconditions.checkArgument(children.isEmpty());
-    return new MockGroupScanPOP(url, readEntries);
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
deleted file mode 100644
index bd57823..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.physical.config;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.MockGroupScanPOP.MockColumn;
-import org.apache.drill.exec.physical.config.MockGroupScanPOP.MockScanEntry;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.TypeHelper;
-import org.apache.drill.exec.vector.ValueVector;
-
-public class MockRecordReader implements RecordReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class);
-
-  private OutputMutator output;
-  private MockScanEntry config;
-  private FragmentContext context;
-  private ValueVector[] valueVectors;
-  private int recordsRead;
-  private int batchRecordCount;
-
-  public MockRecordReader(FragmentContext context, MockScanEntry config) {
-    this.context = context;
-    this.config = config;
-  }
-
-  private int getEstimatedRecordSize(MockColumn[] types) {
-    int x = 0;
-    for (int i = 0; i < types.length; i++) {
-      x += TypeHelper.getSize(types[i].getMajorType());
-    }
-    return x;
-  }
-
-  private ValueVector getVector(String name, MajorType type, int length) {
-    assert context != null : "Context shouldn't be null.";
-    MaterializedField f = MaterializedField.create(new SchemaPath(name, ExpressionPosition.UNKNOWN), type);
-    ValueVector v;
-    v = TypeHelper.getNewVector(f, context.getAllocator());
-    AllocationHelper.allocate(v, length, 50, 4);
-    
-    return v;
-
-  }
-
-  @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
-    try {
-      this.output = output;
-      int estimateRowSize = getEstimatedRecordSize(config.getTypes());
-      valueVectors = new ValueVector[config.getTypes().length];
-      batchRecordCount = 250000 / estimateRowSize;
-
-      for (int i = 0; i < config.getTypes().length; i++) {
-        valueVectors[i] = getVector(config.getTypes()[i].getName(), config.getTypes()[i].getMajorType(), batchRecordCount);
-        output.addField(valueVectors[i]);
-      }
-      output.setNewSchema();
-    } catch (SchemaChangeException e) {
-      throw new ExecutionSetupException("Failure while setting up fields", e);
-    }
-
-  }
-
-  @Override
-  public int next() {
-    
-    int recordSetSize = Math.min(batchRecordCount, this.config.getRecords()- recordsRead);
-
-    recordsRead += recordSetSize;
-    for(ValueVector v : valueVectors){
-      AllocationHelper.allocate(v, recordSetSize, 50, 5);
-      
-      logger.debug("MockRecordReader:  Generating random data for VV of type " + v.getClass().getName());
-      ValueVector.Mutator m = v.getMutator();
-      m.setValueCount(recordSetSize);
-      m.generateTestData();
-      
-    }
-    return recordSetSize;
-  }
-
-  @Override
-  public void cleanup() {
-    for (int i = 0; i < valueVectors.length; i++) {
-      try {
-        output.removeField(valueVectors[i].getField());
-      } catch (SchemaChangeException e) {
-        logger.warn("Failure while trying to remove field.", e);
-      }
-      valueVectors[i].close();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java
deleted file mode 100644
index a06aaee..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.physical.config;
-
-import java.util.List;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.MockGroupScanPOP.MockScanEntry;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.ScanBatch;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.RecordReader;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
-
-  @Override
-  public RecordBatch getBatch(FragmentContext context, MockSubScanPOP config, List<RecordBatch> children) throws ExecutionSetupException {
-    Preconditions.checkArgument(children.isEmpty());
-    List<MockScanEntry> entries = config.getReadEntries();
-    List<RecordReader> readers = Lists.newArrayList();
-    for(MockScanEntry e : entries){
-      readers.add(new MockRecordReader(context, e));
-    }
-    return new ScanBatch(context, readers.iterator());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorePOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorePOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorePOP.java
deleted file mode 100644
index 639d0d2..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorePOP.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.physical.config;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.base.AbstractStore;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.Store;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("mock-store")
-public class MockStorePOP extends AbstractStore {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorePOP.class);
-
-  @JsonCreator
-  public MockStorePOP(@JsonProperty("child") PhysicalOperator child) {
-    super(child);
-  }
-
-  public int getMaxWidth() {
-    return 1;
-  }
-
-  @Override
-  public List<EndpointAffinity> getOperatorAffinity() {
-    return Collections.emptyList();
-  }
-
-  @Override
-  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
-    
-  }
-
-  @Override
-  public Store getSpecificStore(PhysicalOperator child, int minorFragmentId) {
-    return new MockStorePOP(child);
-  }
-
-  @Override
-  public OperatorCost getCost() {
-    return new OperatorCost(1,getSize().getRecordCount()*getSize().getRecordSize(),1,1);
-  }
-
-  @Override
-  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new MockStorePOP(child);
-  }
-
-
-  
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockSubScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockSubScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockSubScanPOP.java
deleted file mode 100644
index 7380617..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockSubScanPOP.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.physical.config;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import com.google.common.collect.Iterators;
-import org.apache.drill.common.graph.GraphVisitor;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.ReadEntry;
-import org.apache.drill.exec.physical.base.*;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.vector.TypeHelper;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-
-@JsonTypeName("mock-sub-scan")
-public class MockSubScanPOP extends AbstractBase implements SubScan {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockGroupScanPOP.class);
-
-  private final String url;
-  protected final List<MockGroupScanPOP.MockScanEntry> readEntries;
-  private final OperatorCost cost;
-  private final Size size;
-  private  LinkedList<MockGroupScanPOP.MockScanEntry>[] mappings;
-
-  @JsonCreator
-  public MockSubScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockGroupScanPOP.MockScanEntry> readEntries) {
-    this.readEntries = readEntries;
-    OperatorCost cost = new OperatorCost(0,0,0,0);
-    Size size = new Size(0,0);
-    for(MockGroupScanPOP.MockScanEntry r : readEntries){
-      cost = cost.add(r.getCost());
-      size = size.add(r.getSize());
-    }
-    this.cost = cost;
-    this.size = size;
-    this.url = url;
-  }
-
-  public String getUrl() {
-    return url;
-  }
-
-  @JsonProperty("entries")
-  public List<MockGroupScanPOP.MockScanEntry> getReadEntries() {
-    return readEntries;
-  }
-
-  @Override
-  public Iterator<PhysicalOperator> iterator() {
-    return Iterators.emptyIterator();
-  }
-
-  @Override
-  public OperatorCost getCost() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Size getSize() {
-    throw new UnsupportedOperationException();
-  }
-
-  // will want to replace these two methods with an interface above for AbstractSubScan
-  @Override
-  public boolean isExecutable() {
-    return true;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
-    return physicalVisitor.visitSubScan(this, value);
-  }
-  // see comment above about replacing this
-
-  @Override
-  @JsonIgnore
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
-    Preconditions.checkArgument(children.isEmpty());
-    return new MockSubScanPOP(url, readEntries);
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 61c9383..0a329d6 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -25,30 +25,31 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.Scan;
+import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.physical.config.Filter;
-import org.apache.drill.exec.physical.config.MockScanBatchCreator;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.config.RandomReceiver;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.physical.config.Sort;
-import org.apache.drill.exec.physical.base.*;
-import org.apache.drill.exec.physical.config.*;
 import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
 import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
 import org.apache.drill.exec.physical.impl.sort.SortBatchCreator;
 import org.apache.drill.exec.physical.impl.svremover.SVRemoverCreator;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.json.JSONScanBatchCreator;
+import org.apache.drill.exec.store.json.JSONSubScan;
+import org.apache.drill.exec.store.mock.MockGroupScanPOP;
+import org.apache.drill.exec.store.mock.MockScanBatchCreator;
+import org.apache.drill.exec.store.mock.MockSubScanPOP;
+import org.apache.drill.exec.store.parquet.ParquetRowGroupScan;
+import org.apache.drill.exec.store.parquet.ParquetScanBatchCreator;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import org.apache.drill.exec.store.parquet.ParquetGroupScan;
-import org.apache.drill.exec.store.parquet.ParquetRowGroupScan;
-import org.apache.drill.exec.store.parquet.ParquetScanBatchCreator;
 
-public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentContext, ExecutionSetupException>{
+public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentContext, ExecutionSetupException> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImplCreator.class);
 
   private MockScanBatchCreator msc = new MockScanBatchCreator();
@@ -62,9 +63,10 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
   private SortBatchCreator sbc = new SortBatchCreator();
   private RootExec root = null;
 
-  private ImplCreator(){}
+  private ImplCreator() {
+  }
 
-  public RootExec getRoot(){
+  public RootExec getRoot() {
     return root;
   }
 
@@ -78,20 +80,13 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
     Preconditions.checkNotNull(subScan);
     Preconditions.checkNotNull(context);
 
-    if(subScan instanceof MockSubScanPOP){
+    if (subScan instanceof MockSubScanPOP) {
       return msc.getBatch(context, (MockSubScanPOP) subScan, Collections.<RecordBatch> emptyList());
-    
-    if(scan instanceof MockScanPOP){
-      return msc.getBatch(context, (MockScanPOP) scan, Collections.<RecordBatch>emptyList());
-    } else if(scan instanceof JSONScanPOP) {
-      return new JSONScanBatchCreator().getBatch(context, (JSONScanPOP)scan, Collections.<RecordBatch>emptyList());
-    }else{
-      return super.visitScan(scan, context);  
-    }
-    else if (subScan instanceof ParquetRowGroupScan){
-      return parquetScan.getBatch(context, (ParquetRowGroupScan) subScan,  Collections.<RecordBatch> emptyList());
-    }
-    else{
+    } else if (subScan instanceof JSONSubScan) {
+      return new JSONScanBatchCreator().getBatch(context, (JSONSubScan) subScan, Collections.<RecordBatch> emptyList());
+    } else if (subScan instanceof ParquetRowGroupScan) {
+      return parquetScan.getBatch(context, (ParquetRowGroupScan) subScan, Collections.<RecordBatch> emptyList());
+    } else {
       return super.visitSubScan(subScan, context);
     }
 
@@ -99,14 +94,13 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
 
   @Override
   public RecordBatch visitOp(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException {
-    if(op instanceof SelectionVectorRemover){
+    if (op instanceof SelectionVectorRemover) {
       return svc.getBatch(context, (SelectionVectorRemover) op, getChildren(op, context));
-    }else{
+    } else {
       return super.visitOp(op, context);
     }
   }
 
-  
   @Override
   public RecordBatch visitSort(Sort sort, FragmentContext context) throws ExecutionSetupException {
     return sbc.getBatch(context, sort, getChildren(sort, context));
@@ -135,18 +129,20 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
     return rrc.getBatch(context, op, null);
   }
 
-  private List<RecordBatch> getChildren(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException{
+  private List<RecordBatch> getChildren(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException {
     List<RecordBatch> children = Lists.newArrayList();
-    for(PhysicalOperator child : op){
+    for (PhysicalOperator child : op) {
       children.add(child.accept(this, context));
     }
     return children;
   }
 
-  public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException{
+  public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
     ImplCreator i = new ImplCreator();
     root.accept(i, context);
-    if(i.root == null) throw new ExecutionSetupException("The provided fragment did not have a root node that correctly created a RootExec value.");
+    if (i.root == null)
+      throw new ExecutionSetupException(
+          "The provided fragment did not have a root node that correctly created a RootExec value.");
     return i.getRoot();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
deleted file mode 100644
index 8a2de63..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
+++ /dev/null
@@ -1,486 +0,0 @@
-package org.apache.drill.exec.store;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
-import com.google.common.io.InputSupplier;
-import com.google.common.io.Resources;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.schema.*;
-import org.apache.drill.exec.schema.json.jackson.JacksonHelper;
-import org.apache.drill.exec.vector.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Map;
-
-import static com.fasterxml.jackson.core.JsonToken.*;
-
-public class JSONRecordReader implements RecordReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
-  private static final int DEFAULT_LENGTH = 256 * 1024; // 256kb
-  public static final Charset UTF_8 = Charset.forName("UTF-8");
-
-  private final String inputPath;
-
-  private final Map<String, VectorHolder> valueVectorMap;
-
-  private JsonParser parser;
-  private SchemaIdGenerator generator;
-  private DiffSchema diffSchema;
-  private RecordSchema currentSchema;
-  private List<Field> removedFields;
-  private OutputMutator outputMutator;
-  private BufferAllocator allocator;
-  private int batchSize;
-
-  public JSONRecordReader(FragmentContext fragmentContext, String inputPath, int batchSize) {
-    this.inputPath = inputPath;
-    this.allocator = fragmentContext.getAllocator();
-    this.batchSize = batchSize;
-    valueVectorMap = Maps.newHashMap();
-  }
-
-  public JSONRecordReader(FragmentContext fragmentContext, String inputPath) {
-    this(fragmentContext, inputPath, DEFAULT_LENGTH);
-  }
-
-  private JsonParser getParser() {
-    return parser;
-  }
-
-  @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
-    outputMutator = output;
-    currentSchema = new ObjectSchema();
-    diffSchema = new DiffSchema();
-    removedFields = Lists.newArrayList();
-
-    try {
-      InputSupplier<InputStreamReader> input;
-      if (inputPath.startsWith("resource:")) {
-        input = Resources.newReaderSupplier(Resources.getResource(inputPath.substring(9)), Charsets.UTF_8);
-      } else {
-        input = Files.newReaderSupplier(new File(URI.create(inputPath)), Charsets.UTF_8);
-      }
-
-      JsonFactory factory = new JsonFactory();
-      parser = factory.createJsonParser(input.getInput());
-      parser.nextToken(); // Read to the first START_OBJECT token
-      generator = new SchemaIdGenerator();
-    } catch (IOException e) {
-      throw new ExecutionSetupException(e);
-    }
-  }
-
-  @Override
-  public int next() {
-    if (parser.isClosed() || !parser.hasCurrentToken()) {
-      return 0;
-    }
-
-    resetBatch();
-
-    int nextRowIndex = 0;
-
-    try {
-      while (ReadType.OBJECT.readRecord(this, null, nextRowIndex++, 0)) {
-        parser.nextToken(); // Read to START_OBJECT token
-
-        if (!parser.hasCurrentToken()) {
-          parser.close();
-          break;
-        }
-      }
-
-      parser.nextToken();
-
-      if (!parser.hasCurrentToken()) {
-        parser.close();
-      }
-
-      // Garbage collect fields never referenced in this batch
-      for (Field field : Iterables.concat(currentSchema.removeUnreadFields(), removedFields)) {
-        diffSchema.addRemovedField(field);
-        outputMutator.removeField(field.getAsMaterializedField());
-      }
-
-      if (diffSchema.isChanged()) {
-        outputMutator.setNewSchema();
-      }
-
-
-    } catch (IOException | SchemaChangeException e) {
-      logger.error("Error reading next in Json reader", e);
-    }
-
-    for (VectorHolder holder : valueVectorMap.values()) {
-      holder.populateVectorLength();
-    }
-
-    return nextRowIndex;
-  }
-
-  private void resetBatch() {
-    for (VectorHolder value : valueVectorMap.values()) {
-      value.reset();
-    }
-
-    currentSchema.resetMarkedFields();
-    diffSchema.reset();
-    removedFields.clear();
-  }
-
-  @Override
-  public void cleanup() {
-    try {
-      parser.close();
-    } catch (IOException e) {
-      logger.warn("Error closing Json parser", e);
-    }
-  }
-
-
-  private RecordSchema getCurrentSchema() {
-    return currentSchema;
-  }
-
-  private void setCurrentSchema(RecordSchema schema) {
-    currentSchema = schema;
-  }
-
-  private List<Field> getRemovedFields() {
-    return removedFields;
-  }
-
-  public BufferAllocator getAllocator() {
-    return allocator;
-  }
-
-  public static enum ReadType {
-    ARRAY(END_ARRAY) {
-      @Override
-      public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) {
-        return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
-      }
-
-      @Override
-      public RecordSchema createSchema() throws IOException {
-        return new ObjectSchema();
-      }
-    },
-    OBJECT(END_OBJECT) {
-      @Override
-      public Field createField(RecordSchema parentSchema,
-                               String prefixFieldName,
-                               String fieldName,
-                               MajorType fieldType,
-                               int index) {
-        return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
-      }
-
-      @Override
-      public RecordSchema createSchema() throws IOException {
-        return new ObjectSchema();
-      }
-    };
-
-    private final JsonToken endObject;
-
-    ReadType(JsonToken endObject) {
-      this.endObject = endObject;
-    }
-
-    public JsonToken getEndObject() {
-      return endObject;
-    }
-
-    @SuppressWarnings("ConstantConditions")
-    public boolean readRecord(JSONRecordReader reader,
-                              String prefixFieldName,
-                              int rowIndex,
-                              int groupCount) throws IOException, SchemaChangeException {
-      JsonParser parser = reader.getParser();
-      JsonToken token = parser.nextToken();
-      JsonToken endObject = getEndObject();
-      int colIndex = 0;
-      boolean isFull = false;
-      while (token != endObject) {
-        if (token == FIELD_NAME) {
-          token = parser.nextToken();
-          continue;
-        }
-
-        String fieldName = parser.getCurrentName();
-        MajorType fieldType = JacksonHelper.getFieldType(token, this == ReadType.ARRAY);
-        ReadType readType = null;
-        switch (token) {
-          case START_ARRAY:
-            readType = ReadType.ARRAY;
-            groupCount++;
-            break;
-          case START_OBJECT:
-            readType = ReadType.OBJECT;
-            groupCount = 0;
-            break;
-        }
-
-        if (fieldType != null) { // Including nulls
-          boolean currentFieldFull = !recordData(
-              readType,
-              reader,
-              fieldType,
-              prefixFieldName,
-              fieldName,
-              rowIndex,
-              colIndex,
-              groupCount);
-
-          isFull = isFull || currentFieldFull;
-        }
-        token = parser.nextToken();
-        colIndex += 1;
-      }
-      return !isFull;
-    }
-
-    private void removeChildFields(List<Field> removedFields, Field field) {
-      RecordSchema schema = field.getAssignedSchema();
-      if (schema == null) {
-        return;
-      }
-      for (Field childField : schema.getFields()) {
-        removedFields.add(childField);
-        if (childField.hasSchema()) {
-          removeChildFields(removedFields, childField);
-        }
-      }
-    }
-
-    private boolean recordData(JSONRecordReader.ReadType readType,
-                               JSONRecordReader reader,
-                               MajorType fieldType,
-                               String prefixFieldName,
-                               String fieldName,
-                               int rowIndex,
-                               int colIndex,
-                               int groupCount) throws IOException, SchemaChangeException {
-      RecordSchema currentSchema = reader.getCurrentSchema();
-      Field field = currentSchema.getField(fieldName == null ? prefixFieldName : fieldName, colIndex);
-      boolean isFieldFound = field != null;
-      List<Field> removedFields = reader.getRemovedFields();
-      boolean newFieldLateBound = fieldType.getMinorType().equals(MinorType.LATE);
-
-      if (isFieldFound && !field.getFieldType().equals(fieldType)) {
-        boolean existingFieldLateBound = field.getFieldType().getMinorType().equals(MinorType.LATE);
-
-        if (newFieldLateBound && !existingFieldLateBound) {
-          fieldType = Types.overrideMinorType(fieldType, field.getFieldType().getMinorType());
-        } else if (!newFieldLateBound && existingFieldLateBound) {
-          field.setFieldType(Types.overrideMinorType(field.getFieldType(), fieldType.getMinorType()));
-        } else if (!newFieldLateBound && !existingFieldLateBound) {
-          if (field.hasSchema()) {
-            removeChildFields(removedFields, field);
-          }
-          removedFields.add(field);
-          currentSchema.removeField(field, colIndex);
-
-          isFieldFound = false;
-        }
-      }
-
-      if (!isFieldFound) {
-        field = createField(
-            currentSchema,
-            prefixFieldName,
-            fieldName,
-            fieldType,
-            colIndex
-        );
-
-        reader.recordNewField(field);
-        currentSchema.addField(field);
-      }
-
-      field.setRead(true);
-
-      VectorHolder holder = getOrCreateVectorHolder(reader, field);
-      if (readType != null) {
-        RecordSchema fieldSchema = field.getAssignedSchema();
-        RecordSchema newSchema = readType.createSchema();
-
-        if (readType != ReadType.ARRAY) {
-          reader.setCurrentSchema(fieldSchema);
-          if (fieldSchema == null) reader.setCurrentSchema(newSchema);
-          readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
-        } else {
-          readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
-        }
-
-        reader.setCurrentSchema(currentSchema);
-
-      } else if (holder != null && !newFieldLateBound && fieldType.getMinorType() != MinorType.LATE) {
-        return addValueToVector(
-            rowIndex,
-            holder,
-            JacksonHelper.getValueFromFieldType(
-                reader.getParser(),
-                fieldType.getMinorType()
-            ),
-            fieldType.getMinorType(),
-            groupCount
-        );
-      }
-
-      return true;
-    }
-
-    private static <T> boolean addValueToVector(int index, VectorHolder holder, T val, MinorType minorType, int groupCount) {
-      switch (minorType) {
-        case INT: {
-          holder.incAndCheckLength(NullableIntHolder.WIDTH * 8 + 1);
-          if (groupCount == 0) {
-            if (val != null) {
-              NullableIntVector int4 = (NullableIntVector) holder.getValueVector();
-              NullableIntVector.Mutator m = int4.getMutator();
-              m.set(index, (Integer) val);
-            }
-          } else {
-            if (val == null) {
-              throw new UnsupportedOperationException("Nullable repeated int is not supported.");
-            }
-
-            RepeatedIntVector repeatedInt4 = (RepeatedIntVector) holder.getValueVector();
-            RepeatedIntVector.Mutator m = repeatedInt4.getMutator();
-            holder.setGroupCount(index);
-            m.add(index, (Integer) val);
-          }
-
-          return holder.hasEnoughSpace(NullableIntHolder.WIDTH * 8 + 1);
-        }
-        case FLOAT4: {
-          holder.incAndCheckLength(NullableFloat4Holder.WIDTH * 8 + 1);
-          if (groupCount == 0) {
-            if (val != null) {
-              NullableFloat4Vector float4 = (NullableFloat4Vector) holder.getValueVector();
-              NullableFloat4Vector.Mutator m = float4.getMutator();
-              m.set(index, (Float) val);
-            }
-          } else {
-            if (val == null) {
-              throw new UnsupportedOperationException("Nullable repeated float is not supported.");
-            }
-
-            RepeatedFloat4Vector repeatedFloat4 = (RepeatedFloat4Vector) holder.getValueVector();
-            RepeatedFloat4Vector.Mutator m = repeatedFloat4.getMutator();
-            holder.setGroupCount(index);
-            m.add(index, (Float) val);
-          }
-          return holder.hasEnoughSpace(NullableFloat4Holder.WIDTH * 8 + 1);
-        }
-        case VARCHAR: {
-          if (val == null) {
-            return (index + 1) * 4 <= holder.getLength();
-          } else {
-            byte[] bytes = ((String) val).getBytes(UTF_8);
-            int length = bytes.length;
-            holder.incAndCheckLength(length);
-            if (groupCount == 0) {
-              NullableVarCharVector varLen4 = (NullableVarCharVector) holder.getValueVector();
-              NullableVarCharVector.Mutator m = varLen4.getMutator();
-              m.set(index, bytes);
-            } else {
-              RepeatedVarCharVector repeatedVarLen4 = (RepeatedVarCharVector) holder.getValueVector();
-              RepeatedVarCharVector.Mutator m = repeatedVarLen4.getMutator();
-              holder.setGroupCount(index);
-              m.add(index, bytes);
-            }
-            return holder.hasEnoughSpace(length + 4 + 1);
-          }
-        }
-        case BIT: {
-          holder.incAndCheckLength(NullableBitHolder.WIDTH + 1);
-          if (groupCount == 0) {
-            if (val != null) {
-              NullableBitVector bit = (NullableBitVector) holder.getValueVector();
-              NullableBitVector.Mutator m = bit.getMutator();
-              m.set(index, (Boolean) val ? 1 : 0);
-            }
-          } else {
-            if (val == null) {
-              throw new UnsupportedOperationException("Nullable repeated boolean is not supported.");
-            }
-
-            RepeatedBitVector repeatedBit = (RepeatedBitVector) holder.getValueVector();
-            RepeatedBitVector.Mutator m = repeatedBit.getMutator();
-            holder.setGroupCount(index);
-            m.add(index, (Boolean) val ? 1 : 0);
-          }
-          return holder.hasEnoughSpace(NullableBitHolder.WIDTH + 1);
-        }
-        default:
-          throw new DrillRuntimeException("Type not supported to add value. Type: " + minorType);
-      }
-    }
-
-    private VectorHolder getOrCreateVectorHolder(JSONRecordReader reader, Field field) throws SchemaChangeException {
-      return reader.getOrCreateVectorHolder(field);
-    }
-
-    public abstract RecordSchema createSchema() throws IOException;
-
-    public abstract Field createField(RecordSchema parentSchema,
-                                      String prefixFieldName,
-                                      String fieldName,
-                                      MajorType fieldType,
-                                      int index);
-  }
-
-  private void recordNewField(Field field) {
-    diffSchema.recordNewField(field);
-  }
-
-  private VectorHolder getOrCreateVectorHolder(Field field) throws SchemaChangeException {
-    String fullFieldName = field.getFullFieldName();
-    VectorHolder holder = valueVectorMap.get(fullFieldName);
-
-    if (holder == null) {
-      MajorType type = field.getFieldType();
-      MinorType minorType = type.getMinorType();
-
-      if (minorType.equals(MinorType.MAP) || minorType.equals(MinorType.LATE)) {
-        return null;
-      }
-
-      MaterializedField f = MaterializedField.create(new SchemaPath(fullFieldName, ExpressionPosition.UNKNOWN), type);
-
-      ValueVector v = TypeHelper.getNewVector(f, allocator);
-      AllocationHelper.allocate(v, batchSize, 50);
-      holder = new VectorHolder(v);
-      valueVectorMap.put(fullFieldName, holder);
-      outputMutator.addField(v);
-      return holder;
-    }
-    return holder;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
index 7cbea57..828ae17 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
@@ -27,11 +27,16 @@ public class VectorHolder {
   private ValueVector vector;
   private int currentLength;
 
-    public VectorHolder(int length, ValueVector vector) {
-        this.length = length;
-        this.vector = vector;
-        this.mutator = vector.getMutator();
-    }
+  
+  public VectorHolder(int length, ValueVector vector) {
+    this.length = length;
+    this.vector = vector;
+  }
+  
+  public VectorHolder(ValueVector vector) {
+    this.length = vector.getValueCapacity();
+    this.vector = vector;
+  }
 
   public ValueVector getValueVector() {
     return vector;
@@ -47,9 +52,9 @@ public class VectorHolder {
   }
 
   public void setGroupCount(int groupCount) {
-    if(this.groupCount < groupCount) {
+    if (this.groupCount < groupCount) {
       RepeatedMutator mutator = (RepeatedMutator) vector.getMutator();
-      while(this.groupCount < groupCount) {
+      while (this.groupCount < groupCount) {
         mutator.startNewGroup(++this.groupCount);
       }
     }
@@ -71,7 +76,7 @@ public class VectorHolder {
 
   public void populateVectorLength() {
     ValueVector.Mutator mutator = vector.getMutator();
-    if(vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) {
+    if (vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) {
       mutator.setValueCount(groupCount);
     } else {
       mutator.setValueCount(count);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java
new file mode 100644
index 0000000..ff5f474
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java
@@ -0,0 +1,145 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.store.json;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.CoordinationProtos;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("json-scan")
+public class JSONGroupScan extends AbstractGroupScan {
+    private static int ESTIMATED_RECORD_SIZE = 1024; // 1kb
+
+    private LinkedList<JSONGroupScan.ScanEntry>[] mappings;
+    protected final List<JSONGroupScan.ScanEntry> readEntries;
+    private final OperatorCost cost;
+    private final Size size;
+    
+    @JsonCreator
+    public JSONGroupScan(@JsonProperty("entries") List<JSONGroupScan.ScanEntry> readEntries) {
+        this.readEntries = readEntries;
+        OperatorCost cost = new OperatorCost(0,0,0,0);
+        Size size = new Size(0,0);
+        for(JSONGroupScan.ScanEntry r : readEntries){
+          cost = cost.add(r.getCost());
+          size = size.add(r.getSize());
+        }
+        this.cost = cost;
+        this.size = size;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) {
+        checkArgument(endpoints.size() <= readEntries.size());
+
+        mappings = new LinkedList[endpoints.size()];
+
+        int i = 0;
+        for (ScanEntry e : readEntries) {
+            if (i == endpoints.size()) i = 0;
+            LinkedList entries = mappings[i];
+            if (entries == null) {
+                entries = new LinkedList<>();
+                mappings[i] = entries;
+            }
+            entries.add(e);
+            i++;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public SubScan getSpecificScan(int minorFragmentId) {
+        checkArgument(minorFragmentId < mappings.length, "Mappings length [%s] should be longer than minor fragment id [%s] but it isn't.", mappings.length, minorFragmentId);
+        return new JSONSubScan(mappings[minorFragmentId]);
+    }
+
+    @Override
+    public List<EndpointAffinity> getOperatorAffinity() {
+        return Collections.emptyList();
+    }
+    
+    public List<JSONGroupScan.ScanEntry> getReadEntries() {
+      return readEntries;
+    }
+
+    @Override
+    public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+        return new JSONGroupScan(readEntries);
+    }
+
+    public static class ScanEntry implements ReadEntry {
+        private final String url;
+        private Size size;
+
+        @JsonCreator
+        public ScanEntry(@JsonProperty("url") String url) {
+            this.url = url;
+            long fileLength = new File(URI.create(url)).length();
+            size = new Size(fileLength / ESTIMATED_RECORD_SIZE, ESTIMATED_RECORD_SIZE);
+        }
+
+        @Override
+        public OperatorCost getCost() {
+            return new OperatorCost(1, 1, 2, 2);
+        }
+
+        @Override
+        public Size getSize() {
+            return size;
+        }
+
+        public String getUrl() {
+            return url;
+        }
+    }
+
+    @Override
+    public int getMaxParallelizationWidth() {
+      return readEntries.size();
+    }
+
+    @Override
+    public OperatorCost getCost() {
+      return cost;
+    }
+
+    @Override
+    public Size getSize() {
+      return size;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java
new file mode 100644
index 0000000..eee0fb6
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java
@@ -0,0 +1,489 @@
+package org.apache.drill.exec.store.json;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import com.google.common.io.InputSupplier;
+import com.google.common.io.Resources;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.schema.*;
+import org.apache.drill.exec.schema.json.jackson.JacksonHelper;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.VectorHolder;
+import org.apache.drill.exec.vector.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+import static com.fasterxml.jackson.core.JsonToken.*;
+
+public class JSONRecordReader implements RecordReader {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
+  private static final int DEFAULT_LENGTH = 256 * 1024; // 256kb
+  public static final Charset UTF_8 = Charset.forName("UTF-8");
+
+  private final String inputPath;
+
+  private final Map<String, VectorHolder> valueVectorMap;
+
+  private JsonParser parser;
+  private SchemaIdGenerator generator;
+  private DiffSchema diffSchema;
+  private RecordSchema currentSchema;
+  private List<Field> removedFields;
+  private OutputMutator outputMutator;
+  private BufferAllocator allocator;
+  private int batchSize;
+
+  public JSONRecordReader(FragmentContext fragmentContext, String inputPath, int batchSize) {
+    this.inputPath = inputPath;
+    this.allocator = fragmentContext.getAllocator();
+    this.batchSize = batchSize;
+    valueVectorMap = Maps.newHashMap();
+  }
+
+  public JSONRecordReader(FragmentContext fragmentContext, String inputPath) {
+    this(fragmentContext, inputPath, DEFAULT_LENGTH);
+  }
+
+  private JsonParser getParser() {
+    return parser;
+  }
+
+  @Override
+  public void setup(OutputMutator output) throws ExecutionSetupException {
+    outputMutator = output;
+    currentSchema = new ObjectSchema();
+    diffSchema = new DiffSchema();
+    removedFields = Lists.newArrayList();
+
+    try {
+      InputSupplier<InputStreamReader> input;
+      if (inputPath.startsWith("resource:")) {
+        input = Resources.newReaderSupplier(Resources.getResource(inputPath.substring(9)), Charsets.UTF_8);
+      } else {
+        input = Files.newReaderSupplier(new File(URI.create(inputPath)), Charsets.UTF_8);
+      }
+
+      JsonFactory factory = new JsonFactory();
+      parser = factory.createJsonParser(input.getInput());
+      parser.nextToken(); // Read to the first START_OBJECT token
+      generator = new SchemaIdGenerator();
+    } catch (IOException e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  @Override
+  public int next() {
+    if (parser.isClosed() || !parser.hasCurrentToken()) {
+      return 0;
+    }
+
+    resetBatch();
+
+    int nextRowIndex = 0;
+
+    try {
+      while (ReadType.OBJECT.readRecord(this, null, nextRowIndex++, 0)) {
+        parser.nextToken(); // Read to START_OBJECT token
+
+        if (!parser.hasCurrentToken()) {
+          parser.close();
+          break;
+        }
+      }
+
+      parser.nextToken();
+
+      if (!parser.hasCurrentToken()) {
+        parser.close();
+      }
+
+      // Garbage collect fields never referenced in this batch
+      for (Field field : Iterables.concat(currentSchema.removeUnreadFields(), removedFields)) {
+        diffSchema.addRemovedField(field);
+        outputMutator.removeField(field.getAsMaterializedField());
+      }
+
+      if (diffSchema.isChanged()) {
+        outputMutator.setNewSchema();
+      }
+
+
+    } catch (IOException | SchemaChangeException e) {
+      logger.error("Error reading next in Json reader", e);
+    }
+
+    for (VectorHolder holder : valueVectorMap.values()) {
+      holder.populateVectorLength();
+    }
+
+    return nextRowIndex;
+  }
+
+  private void resetBatch() {
+    for (VectorHolder value : valueVectorMap.values()) {
+      value.reset();
+    }
+
+    currentSchema.resetMarkedFields();
+    diffSchema.reset();
+    removedFields.clear();
+  }
+
+  @Override
+  public void cleanup() {
+    try {
+      parser.close();
+    } catch (IOException e) {
+      logger.warn("Error closing Json parser", e);
+    }
+  }
+
+
+  private RecordSchema getCurrentSchema() {
+    return currentSchema;
+  }
+
+  private void setCurrentSchema(RecordSchema schema) {
+    currentSchema = schema;
+  }
+
+  private List<Field> getRemovedFields() {
+    return removedFields;
+  }
+
+  public BufferAllocator getAllocator() {
+    return allocator;
+  }
+
+  public static enum ReadType {
+    ARRAY(END_ARRAY) {
+      @Override
+      public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) {
+        return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
+      }
+
+      @Override
+      public RecordSchema createSchema() throws IOException {
+        return new ObjectSchema();
+      }
+    },
+    OBJECT(END_OBJECT) {
+      @Override
+      public Field createField(RecordSchema parentSchema,
+                               String prefixFieldName,
+                               String fieldName,
+                               MajorType fieldType,
+                               int index) {
+        return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
+      }
+
+      @Override
+      public RecordSchema createSchema() throws IOException {
+        return new ObjectSchema();
+      }
+    };
+
+    private final JsonToken endObject;
+
+    ReadType(JsonToken endObject) {
+      this.endObject = endObject;
+    }
+
+    public JsonToken getEndObject() {
+      return endObject;
+    }
+
+    @SuppressWarnings("ConstantConditions")
+    public boolean readRecord(JSONRecordReader reader,
+                              String prefixFieldName,
+                              int rowIndex,
+                              int groupCount) throws IOException, SchemaChangeException {
+      JsonParser parser = reader.getParser();
+      JsonToken token = parser.nextToken();
+      JsonToken endObject = getEndObject();
+      int colIndex = 0;
+      boolean isFull = false;
+      while (token != endObject) {
+        if (token == FIELD_NAME) {
+          token = parser.nextToken();
+          continue;
+        }
+
+        String fieldName = parser.getCurrentName();
+        MajorType fieldType = JacksonHelper.getFieldType(token, this == ReadType.ARRAY);
+        ReadType readType = null;
+        switch (token) {
+          case START_ARRAY:
+            readType = ReadType.ARRAY;
+            groupCount++;
+            break;
+          case START_OBJECT:
+            readType = ReadType.OBJECT;
+            groupCount = 0;
+            break;
+        }
+
+        if (fieldType != null) { // Including nulls
+          boolean currentFieldFull = !recordData(
+              readType,
+              reader,
+              fieldType,
+              prefixFieldName,
+              fieldName,
+              rowIndex,
+              colIndex,
+              groupCount);
+
+          isFull = isFull || currentFieldFull;
+        }
+        token = parser.nextToken();
+        colIndex += 1;
+      }
+      return !isFull;
+    }
+
+    private void removeChildFields(List<Field> removedFields, Field field) {
+      RecordSchema schema = field.getAssignedSchema();
+      if (schema == null) {
+        return;
+      }
+      for (Field childField : schema.getFields()) {
+        removedFields.add(childField);
+        if (childField.hasSchema()) {
+          removeChildFields(removedFields, childField);
+        }
+      }
+    }
+
+    private boolean recordData(JSONRecordReader.ReadType readType,
+                               JSONRecordReader reader,
+                               MajorType fieldType,
+                               String prefixFieldName,
+                               String fieldName,
+                               int rowIndex,
+                               int colIndex,
+                               int groupCount) throws IOException, SchemaChangeException {
+      RecordSchema currentSchema = reader.getCurrentSchema();
+      Field field = currentSchema.getField(fieldName == null ? prefixFieldName : fieldName, colIndex);
+      boolean isFieldFound = field != null;
+      List<Field> removedFields = reader.getRemovedFields();
+      boolean newFieldLateBound = fieldType.getMinorType().equals(MinorType.LATE);
+
+      if (isFieldFound && !field.getFieldType().equals(fieldType)) {
+        boolean existingFieldLateBound = field.getFieldType().getMinorType().equals(MinorType.LATE);
+
+        if (newFieldLateBound && !existingFieldLateBound) {
+          fieldType = Types.overrideMinorType(fieldType, field.getFieldType().getMinorType());
+        } else if (!newFieldLateBound && existingFieldLateBound) {
+          field.setFieldType(Types.overrideMinorType(field.getFieldType(), fieldType.getMinorType()));
+        } else if (!newFieldLateBound && !existingFieldLateBound) {
+          if (field.hasSchema()) {
+            removeChildFields(removedFields, field);
+          }
+          removedFields.add(field);
+          currentSchema.removeField(field, colIndex);
+
+          isFieldFound = false;
+        }
+      }
+
+      if (!isFieldFound) {
+        field = createField(
+            currentSchema,
+            prefixFieldName,
+            fieldName,
+            fieldType,
+            colIndex
+        );
+
+        reader.recordNewField(field);
+        currentSchema.addField(field);
+      }
+
+      field.setRead(true);
+
+      VectorHolder holder = getOrCreateVectorHolder(reader, field);
+      if (readType != null) {
+        RecordSchema fieldSchema = field.getAssignedSchema();
+        RecordSchema newSchema = readType.createSchema();
+
+        if (readType != ReadType.ARRAY) {
+          reader.setCurrentSchema(fieldSchema);
+          if (fieldSchema == null) reader.setCurrentSchema(newSchema);
+          readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
+        } else {
+          readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
+        }
+
+        reader.setCurrentSchema(currentSchema);
+
+      } else if (holder != null && !newFieldLateBound && fieldType.getMinorType() != MinorType.LATE) {
+        return addValueToVector(
+            rowIndex,
+            holder,
+            JacksonHelper.getValueFromFieldType(
+                reader.getParser(),
+                fieldType.getMinorType()
+            ),
+            fieldType.getMinorType(),
+            groupCount
+        );
+      }
+
+      return true;
+    }
+
+    private static <T> boolean addValueToVector(int index, VectorHolder holder, T val, MinorType minorType, int groupCount) {
+      switch (minorType) {
+        case INT: {
+          holder.incAndCheckLength(NullableIntHolder.WIDTH * 8 + 1);
+          if (groupCount == 0) {
+            if (val != null) {
+              NullableIntVector int4 = (NullableIntVector) holder.getValueVector();
+              NullableIntVector.Mutator m = int4.getMutator();
+              m.set(index, (Integer) val);
+            }
+          } else {
+            if (val == null) {
+              throw new UnsupportedOperationException("Nullable repeated int is not supported.");
+            }
+
+            RepeatedIntVector repeatedInt4 = (RepeatedIntVector) holder.getValueVector();
+            RepeatedIntVector.Mutator m = repeatedInt4.getMutator();
+            holder.setGroupCount(index);
+            m.add(index, (Integer) val);
+          }
+
+          return holder.hasEnoughSpace(NullableIntHolder.WIDTH * 8 + 1);
+        }
+        case FLOAT4: {
+          holder.incAndCheckLength(NullableFloat4Holder.WIDTH * 8 + 1);
+          if (groupCount == 0) {
+            if (val != null) {
+              NullableFloat4Vector float4 = (NullableFloat4Vector) holder.getValueVector();
+              NullableFloat4Vector.Mutator m = float4.getMutator();
+              m.set(index, (Float) val);
+            }
+          } else {
+            if (val == null) {
+              throw new UnsupportedOperationException("Nullable repeated float is not supported.");
+            }
+
+            RepeatedFloat4Vector repeatedFloat4 = (RepeatedFloat4Vector) holder.getValueVector();
+            RepeatedFloat4Vector.Mutator m = repeatedFloat4.getMutator();
+            holder.setGroupCount(index);
+            m.add(index, (Float) val);
+          }
+          return holder.hasEnoughSpace(NullableFloat4Holder.WIDTH * 8 + 1);
+        }
+        case VARCHAR: {
+          if (val == null) {
+            return (index + 1) * 4 <= holder.getLength();
+          } else {
+            byte[] bytes = ((String) val).getBytes(UTF_8);
+            int length = bytes.length;
+            holder.incAndCheckLength(length);
+            if (groupCount == 0) {
+              NullableVarCharVector varLen4 = (NullableVarCharVector) holder.getValueVector();
+              NullableVarCharVector.Mutator m = varLen4.getMutator();
+              m.set(index, bytes);
+            } else {
+              RepeatedVarCharVector repeatedVarLen4 = (RepeatedVarCharVector) holder.getValueVector();
+              RepeatedVarCharVector.Mutator m = repeatedVarLen4.getMutator();
+              holder.setGroupCount(index);
+              m.add(index, bytes);
+            }
+            return holder.hasEnoughSpace(length + 4 + 1);
+          }
+        }
+        case BIT: {
+          holder.incAndCheckLength(NullableBitHolder.WIDTH + 1);
+          if (groupCount == 0) {
+            if (val != null) {
+              NullableBitVector bit = (NullableBitVector) holder.getValueVector();
+              NullableBitVector.Mutator m = bit.getMutator();
+              m.set(index, (Boolean) val ? 1 : 0);
+            }
+          } else {
+            if (val == null) {
+              throw new UnsupportedOperationException("Nullable repeated boolean is not supported.");
+            }
+
+            RepeatedBitVector repeatedBit = (RepeatedBitVector) holder.getValueVector();
+            RepeatedBitVector.Mutator m = repeatedBit.getMutator();
+            holder.setGroupCount(index);
+            m.add(index, (Boolean) val ? 1 : 0);
+          }
+          return holder.hasEnoughSpace(NullableBitHolder.WIDTH + 1);
+        }
+        default:
+          throw new DrillRuntimeException("Type not supported to add value. Type: " + minorType);
+      }
+    }
+
+    private VectorHolder getOrCreateVectorHolder(JSONRecordReader reader, Field field) throws SchemaChangeException {
+      return reader.getOrCreateVectorHolder(field);
+    }
+
+    public abstract RecordSchema createSchema() throws IOException;
+
+    public abstract Field createField(RecordSchema parentSchema,
+                                      String prefixFieldName,
+                                      String fieldName,
+                                      MajorType fieldType,
+                                      int index);
+  }
+
+  private void recordNewField(Field field) {
+    diffSchema.recordNewField(field);
+  }
+
+  private VectorHolder getOrCreateVectorHolder(Field field) throws SchemaChangeException {
+    String fullFieldName = field.getFullFieldName();
+    VectorHolder holder = valueVectorMap.get(fullFieldName);
+
+    if (holder == null) {
+      MajorType type = field.getFieldType();
+      MinorType minorType = type.getMinorType();
+
+      if (minorType.equals(MinorType.MAP) || minorType.equals(MinorType.LATE)) {
+        return null;
+      }
+
+      MaterializedField f = MaterializedField.create(new SchemaPath(fullFieldName, ExpressionPosition.UNKNOWN), type);
+
+      ValueVector v = TypeHelper.getNewVector(f, allocator);
+      AllocationHelper.allocate(v, batchSize, 50);
+      holder = new VectorHolder(v);
+      valueVectorMap.put(fullFieldName, holder);
+      outputMutator.addField(v);
+      return holder;
+    }
+    return holder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2884db7a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java
new file mode 100644
index 0000000..eda6b75
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.store.json;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+
+import java.util.List;
+
+public class JSONScanBatchCreator implements BatchCreator<JSONSubScan> {
+
+    @Override
+    public RecordBatch getBatch(FragmentContext context, JSONSubScan config, List<RecordBatch> children) throws ExecutionSetupException {
+        Preconditions.checkArgument(children.isEmpty());
+        List<JSONGroupScan.ScanEntry> entries = config.getReadEntries();
+        List<RecordReader> readers = Lists.newArrayList();
+        for (JSONGroupScan.ScanEntry e : entries) {
+            readers.add(new JSONRecordReader(context, e.getUrl()));
+        }
+
+        return new ScanBatch(context, readers.iterator());
+    }
+}