You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:44:49 UTC

[11/27] git commit: Initial Parquet commit. Suports INT, LONG, FLOAT, DOUBLE, distributed scheduling.

Initial Parquet commit.  Suports INT, LONG, FLOAT, DOUBLE, distributed scheduling.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/0a2f997f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/0a2f997f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/0a2f997f

Branch: refs/heads/master
Commit: 0a2f997ff8d95b816238edc78f8ccf4c5cbbb924
Parents: 0a327ed
Author: Jason Altekruse <al...@gmial.com>
Authored: Thu Aug 8 11:50:47 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 15 16:57:08 2013 -0700

----------------------------------------------------------------------
 .../org/apache/drill/common/JSONOptions.java    |   5 +-
 .../drill/common/logical/LogicalPlan.java       |   2 +-
 .../common/logical/StorageEngineConfig.java     |   2 +
 .../common/logical/StorageEngineConfigBase.java |   2 +
 .../drill/storage/MockStorageEngineConfig.java  |  18 +-
 sandbox/prototype/exec/java-exec/pom.xml        |  67 ++-
 .../templates/VariableLengthVectors.java        |   6 +-
 .../org/apache/drill/exec/ExecConstants.java    |   1 +
 .../exec/exception/OptimizerException.java      |  17 +-
 .../org/apache/drill/exec/ops/QueryContext.java |   7 +
 .../apache/drill/exec/opt/BasicOptimizer.java   | 205 ++++---
 .../drill/exec/physical/ReadEntryFromHDFS.java  |  54 ++
 .../drill/exec/physical/ReadEntryWithPath.java  |  41 ++
 .../exec/physical/base/AbstractGroupScan.java   |  45 ++
 .../physical/base/AbstractPhysicalVisitor.java  |   9 +-
 .../drill/exec/physical/base/AbstractScan.java  |  84 ---
 .../drill/exec/physical/base/GroupScan.java     |  36 ++
 .../apache/drill/exec/physical/base/Leaf.java   |   2 +-
 .../exec/physical/base/PhysicalOperator.java    |   2 +-
 .../exec/physical/base/PhysicalVisitor.java     |   3 +-
 .../apache/drill/exec/physical/base/Scan.java   |  21 +-
 .../drill/exec/physical/base/SubScan.java       |  23 +
 .../exec/physical/config/MockGroupScanPOP.java  | 221 +++++++
 .../exec/physical/config/MockRecordReader.java  |   4 +-
 .../physical/config/MockScanBatchCreator.java   |   6 +-
 .../drill/exec/physical/config/MockScanPOP.java | 193 ------
 .../exec/physical/config/MockStorageEngine.java |   8 +-
 .../exec/physical/config/MockSubScanPOP.java    | 115 ++++
 .../drill/exec/physical/impl/ImplCreator.java   |  45 +-
 .../drill/exec/physical/impl/OutputMutator.java |   1 +
 .../drill/exec/physical/impl/ScanBatch.java     |   6 +
 .../exec/physical/impl/SingleSenderCreator.java |   3 +
 .../drill/exec/planner/PhysicalPlanReader.java  |  14 +-
 .../planner/fragment/MakeFragmentsVisitor.java  |   9 +-
 .../exec/planner/fragment/Materializer.java     |  17 +-
 .../planner/fragment/SimpleParallelizer.java    |   2 +
 .../exec/planner/fragment/StatsCollector.java   |  19 +-
 .../drill/exec/planner/fragment/Wrapper.java    |  20 +-
 .../exec/record/FragmentWritableBatch.java      |   6 +
 .../drill/exec/record/RecordBatchLoader.java    |  38 +-
 .../apache/drill/exec/record/WritableBatch.java |   1 -
 .../org/apache/drill/exec/rpc/RpcEncoder.java   |  12 +-
 .../drill/exec/server/DrillbitContext.java      |  10 +-
 .../drill/exec/service/ServiceEngine.java       |   5 +-
 .../drill/exec/store/AbstractStorageEngine.java |   4 +-
 .../drill/exec/store/AffinityCalculator.java    | 112 ++++
 .../apache/drill/exec/store/StorageEngine.java  |  12 +-
 .../drill/exec/store/StorageEngineRegistry.java |   6 +-
 .../apache/drill/exec/store/VectorHolder.java   |   9 +-
 .../drill/exec/store/parquet/BitReader.java     |  87 +++
 .../drill/exec/store/parquet/ColumnReader.java  | 115 ++++
 .../store/parquet/FixedByteAlignedReader.java   |  48 ++
 .../exec/store/parquet/PageReadStatus.java      | 116 ++++
 .../exec/store/parquet/ParquetGroupScan.java    | 357 +++++++++++
 .../exec/store/parquet/ParquetRecordReader.java | 403 +++++++++++++
 .../exec/store/parquet/ParquetRowGroupScan.java | 137 +++++
 .../store/parquet/ParquetScanBatchCreator.java  |  73 +++
 .../store/parquet/ParquetStorageEngine.java     | 116 ++++
 .../parquet/ParquetStorageEngineConfig.java     |  66 +++
 .../exec/store/parquet/VarLenBinaryReader.java  | 130 ++++
 .../drill/exec/vector/BaseDataValueVector.java  |   6 +-
 .../work/AbstractFragmentRunnerListener.java    |   4 +-
 .../exec/work/RemoteFragmentRunnerListener.java |  45 ++
 .../work/RemotingFragmentRunnerListener.java    |  48 --
 .../exec/work/batch/BitComHandlerImpl.java      |   6 +-
 .../apache/drill/exec/work/foreman/Foreman.java |   2 +-
 .../work/fragment/RemoteFragmentHandler.java    |   4 +-
 .../parquet/hadoop/CodecFactoryExposer.java     |  42 ++
 .../src/main/resources/drill-module.conf        |   7 +-
 .../impl/TestDistributedFragmentRun.java        |   5 +-
 .../apache/drill/exec/store/ByteArrayUtil.java  | 181 ++++++
 .../drill/exec/store/JSONRecordReaderTest.java  |   5 +
 .../apache/drill/exec/store/MockScantTest.java  | 115 ++++
 .../exec/store/ParquetRecordReaderTest.java     | 594 +++++++++++++++++++
 .../exec/store/TestAffinityCalculator.java      | 229 +++++++
 .../exec/store/TestParquetPhysicalPlan.java     |  56 ++
 .../src/test/resources/drill-module.conf        |   4 +-
 .../src/test/resources/filter/test1.json        |   2 +-
 .../test/resources/functions/float4Equal.json   |   2 +-
 .../resources/functions/float4GreaterThan.json  |   2 +-
 .../functions/float4GreaterThanEqual.json       |   2 +-
 .../resources/functions/float4LessThan.json     |   2 +-
 .../functions/float4LessThanEqual.json          |   2 +-
 .../resources/functions/float4NotEqual.json     |   2 +-
 .../test/resources/functions/float8Equal.json   |   2 +-
 .../resources/functions/float8GreaterThan.json  |   2 +-
 .../functions/float8GreaterThanEqual.json       |   2 +-
 .../resources/functions/float8LessThan.json     |   2 +-
 .../functions/float8LessThanEqual.json          |   2 +-
 .../resources/functions/float8NotEqual.json     |   2 +-
 .../src/test/resources/functions/intEqual.json  |   2 +-
 .../resources/functions/intGreaterThan.json     |   2 +-
 .../functions/intGreaterThanEqual.json          |   2 +-
 .../test/resources/functions/intLessThan.json   |   2 +-
 .../resources/functions/intLessThanEqual.json   |   2 +-
 .../test/resources/functions/intNotEqual.json   |   2 +-
 .../src/test/resources/functions/longEqual.json |   2 +-
 .../resources/functions/longGreaterThan.json    |   2 +-
 .../functions/longGreaterThanEqual.json         |   2 +-
 .../test/resources/functions/longLessThan.json  |   2 +-
 .../resources/functions/longLessThanEqual.json  |   2 +-
 .../test/resources/functions/longNotEqual.json  |   2 +-
 .../functions/nullableBigIntEqual.json          |   2 +-
 .../functions/nullableBigIntGreaterThan.json    |   2 +-
 .../nullableBigIntGreaterThanEqual.json         |   2 +-
 .../functions/nullableBigIntLessThan.json       |   2 +-
 .../functions/nullableBigIntLessThanEqual.json  |   2 +-
 .../functions/nullableBigIntNotEqual.json       |   2 +-
 .../resources/functions/nullableIntEqual.json   |   2 +-
 .../functions/nullableIntGreaterThan.json       |   2 +-
 .../functions/nullableIntGreaterThanEqual.json  |   2 +-
 .../functions/nullableIntLessThan.json          |   2 +-
 .../functions/nullableIntLessThanEqual.json     |   2 +-
 .../functions/nullableIntNotEqual.json          |   2 +-
 .../resources/functions/testByteSubstring.json  |   2 +-
 .../test/resources/functions/testIsNotNull.json |   2 +-
 .../test/resources/functions/testIsNull.json    |   2 +-
 .../test/resources/functions/testSubstring.json |   2 +-
 .../functions/testSubstringNegative.json        |   2 +-
 .../java-exec/src/test/resources/mock-scan.json |  31 +
 .../src/test/resources/parquet_scan_screen.json |  44 ++
 .../parquet_scan_union_screen_physical.json     |  35 ++
 .../src/test/resources/physical_repeated_1.json |   2 +-
 .../src/test/resources/project/test1.json       |   2 +-
 .../src/test/resources/remover/test1.json       |   2 +-
 .../src/test/resources/sort/one_key_sort.json   |   2 +-
 .../src/test/resources/sort/two_key_sort.json   |   2 +-
 .../prototype/exec/java-exec/src/test/sh/runbit |   2 +-
 .../org/apache/drill/exec/ref/ROPConverter.java |   5 +-
 .../apache/drill/exec/ref/rse/ClasspathRSE.java |   5 +
 .../apache/drill/exec/ref/rse/ConsoleRSE.java   |   8 +-
 .../drill/exec/ref/rse/FileSystemRSE.java       |  17 +
 .../org/apache/drill/exec/ref/rse/QueueRSE.java |  19 +-
 .../java/org/apache/drill/optiq/DrillScan.java  |   2 +-
 134 files changed, 4196 insertions(+), 617 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/common/src/main/java/org/apache/drill/common/JSONOptions.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/JSONOptions.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/JSONOptions.java
index 8a185a4..d091e17 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/JSONOptions.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/JSONOptions.java
@@ -65,7 +65,10 @@ public class JSONOptions {
   public <T> T getListWith(DrillConfig config, TypeReference<T> t) throws IOException {
       ObjectMapper mapper = config.getMapper();
       return mapper.treeAsTokens(root).readValueAs(t);
-     // return mapper.treeToValue(root,  mapper.getTypeFactory().constructCollectionType(List.class, c));
+  }
+
+  public <T> T getListWith(ObjectMapper mapper, TypeReference<T> t) throws IOException {
+    return mapper.treeAsTokens(root).readValueAs(t);
   }
   
   public JsonNode path(String name){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
index 05fbd1f..742001a 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
@@ -63,7 +63,7 @@ public class LogicalPlan {
     return GraphAlgos.TopoSorter.sortLogical(graph);
   }
 
-  public StorageEngineConfig getStorageEngine(String name) {
+  public StorageEngineConfig getStorageEngineConfig(String name) {
     return storageEngineMap.get(name);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfig.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfig.java
index 3a893d6..b73a2c1 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfig.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfig.java
@@ -23,4 +23,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property="type")
 public interface StorageEngineConfig{
+
+  public boolean equals(Object o);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfigBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfigBase.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfigBase.java
index 853196c..51dbef3 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfigBase.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfigBase.java
@@ -35,5 +35,7 @@ public abstract class StorageEngineConfigBase implements StorageEngineConfig{
     logger.debug("Adding Storage Engine Configs including {}", (Object) sec );
     return sec;
   }
+
+  public abstract boolean equals(Object o);
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/common/src/test/java/org/apache/drill/storage/MockStorageEngineConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/java/org/apache/drill/storage/MockStorageEngineConfig.java b/sandbox/prototype/common/src/test/java/org/apache/drill/storage/MockStorageEngineConfig.java
index bc24b2e..5843c38 100644
--- a/sandbox/prototype/common/src/test/java/org/apache/drill/storage/MockStorageEngineConfig.java
+++ b/sandbox/prototype/common/src/test/java/org/apache/drill/storage/MockStorageEngineConfig.java
@@ -39,5 +39,21 @@ public class MockStorageEngineConfig extends StorageEngineConfigBase{
     return url;
   }
 
-  
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    MockStorageEngineConfig that = (MockStorageEngineConfig) o;
+
+    if (url != null ? !url.equals(that.url) : that.url != null) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return url != null ? url.hashCode() : 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/pom.xml b/sandbox/prototype/exec/java-exec/pom.xml
index 4b13952..cd9bc9a 100644
--- a/sandbox/prototype/exec/java-exec/pom.xml
+++ b/sandbox/prototype/exec/java-exec/pom.xml
@@ -1,7 +1,7 @@
 <?xml version="1.0"?>
 <project
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
-  xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+    xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <artifactId>exec-parent</artifactId>
@@ -46,7 +46,12 @@
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-column</artifactId>
-      <version>1.0.0</version>
+      <version>1.0.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>parquet-hadoop</artifactId>
+      <version>1.0.1-SNAPSHOT</version>
     </dependency>
     <dependency>
       <groupId>com.yammer.metrics</groupId>
@@ -101,16 +106,57 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-core</artifactId>
-      <version>1.1.0</version>
+      <version>1.0.3-mapr-2.1.2.1</version>
       <exclusions>
         <exclusion>
           <artifactId>jets3t</artifactId>
           <groupId>net.java.dev.jets3t</groupId>
         </exclusion>
         <exclusion>
+          <artifactId>log4j</artifactId>
+          <groupId>log4j</groupId>
+        </exclusion>
+
+        <exclusion>
+          <artifactId>mockito-all</artifactId>
+          <groupId>org.mockito</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>commons-logging-api</artifactId>
+          <groupId>commons-logging</groupId>
+        </exclusion>
+        <exclusion>
           <artifactId>commons-logging</artifactId>
           <groupId>commons-logging</groupId>
         </exclusion>
+        <exclusion>
+          <artifactId>slf4j-log4j12</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>servlet-api-2.5</artifactId>
+          <groupId>org.mortbay.jetty</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jasper-runtime</artifactId>
+          <groupId>tomcat</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jasper-compiler</artifactId>
+          <groupId>tomcat</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jetty</artifactId>
+          <groupId>org.mortbay.jetty</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jersey-server</artifactId>
+          <groupId>com.sun.jersey</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>core</artifactId>
+          <groupId>org.eclipse.jdt</groupId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -191,13 +237,13 @@
                   </fileset>
                 </path>
                 <pathconvert pathsep=" " property="proto.files"
-                  refid="proto.path.files" />
+                             refid="proto.path.files" />
 
                 <exec executable="protoc">
                   <arg value="--java_out=${target.gen.source.path}" />
                   <arg value="--proto_path=${proto.cas.path}" />
                   <arg
-                    value="--proto_path=${project.basedir}/../../common/src/main/protobuf/" />
+                      value="--proto_path=${project.basedir}/../../common/src/main/protobuf/" />
                   <arg line="${proto.files}" />
                 </exec>
               </tasks>
@@ -244,4 +290,13 @@
     </plugins>
   </build>
 
+
+  <repositories>
+    <repository>
+      <id>mapr-releases</id>
+      <url>http://repository.mapr.com/maven/</url>
+      <snapshots><enabled>false</enabled></snapshots>
+      <releases><enabled>true</enabled></releases>
+    </repository>
+  </repositories>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
index 061234c..7ceafe4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
@@ -21,11 +21,9 @@ import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 import org.apache.drill.exec.record.DeadBuf;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
-import org.mortbay.jetty.servlet.Holder;
 
 import com.google.common.base.Charsets;
 
-import antlr.collections.impl.Vector;
 
 /**
  * ${minor.class}Vector implements a vector of variable width values.  Elements in the vector
@@ -197,6 +195,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     public int getValueCount() {
       return valueCount;
     }
+
+    public UInt${type.width}Vector getOffsetVector(){
+      return offsetVector;
+    }
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 4eb0f4c..5e7ddf0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -30,4 +30,5 @@ public interface ExecConstants {
   public static final String INITIAL_USER_PORT = "drill.exec.rpc.user.port";
   public static final String METRICS_CONTEXT_NAME = "drill.exec.metrics.context";
   public static final String FUNCTION_PACKAGES = "drill.exec.functions";
+  public static final String USE_IP_ADDRESS = "drill.exec.rpc.use.ip";
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/OptimizerException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/OptimizerException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/OptimizerException.java
index c57ce4a..ac56afb 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/OptimizerException.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/OptimizerException.java
@@ -1,14 +1,13 @@
 package org.apache.drill.exec.exception;
 
-/**
- * Created with IntelliJ IDEA.
- * User: jaltekruse
- * Date: 6/11/13
- * Time: 5:37 PM
- * To change this template use File | Settings | File Templates.
- */
-public class OptimizerException extends Exception{
-    public OptimizerException(String s) {
+import org.apache.drill.common.exceptions.DrillException;
+
+public class OptimizerException extends DrillException {
+  public OptimizerException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public OptimizerException(String s) {
         super(s);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 1c251b8..44117ff 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -19,12 +19,15 @@ package org.apache.drill.exec.ops;
 
 import java.util.Collection;
 
+import org.apache.drill.common.logical.StorageEngineConfig;
 import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.exception.SetupException;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.rpc.bit.BitCom;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StorageEngine;
 
 public class QueryContext {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
@@ -46,6 +49,10 @@ public class QueryContext {
     return queryId;
   }
 
+  public StorageEngine getStorageEngine(StorageEngineConfig config) throws SetupException {
+    return drillbitContext.getStorageEngine(config);
+  }
+
   public DistributedCache getCache(){
     return drillbitContext.getCache();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 289ec4b..c4a7e43 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -2,22 +2,28 @@ package org.apache.drill.exec.opt;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 
 import org.apache.drill.common.PlanProperties;
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.expression.*;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.FunctionDefinition;
+import org.apache.drill.common.expression.NoArgValidator;
+import org.apache.drill.common.expression.OutputTypeDeterminer;
 import org.apache.drill.common.logical.LogicalPlan;
-import org.apache.drill.common.logical.data.*;
 import org.apache.drill.common.logical.data.Filter;
 import org.apache.drill.common.logical.data.Project;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.common.logical.data.SinkOperator;
+import org.apache.drill.common.logical.data.Store;
 import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.exception.OptimizerException;
+import org.apache.drill.exec.exception.SetupException;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -27,112 +33,127 @@ import com.fasterxml.jackson.core.type.TypeReference;
 
 public class BasicOptimizer extends Optimizer{
 
-    private DrillConfig config;
-    private QueryContext context;
-
-    public BasicOptimizer(DrillConfig config, QueryContext context){
-        this.config = config;
-        this.context = context;
+  private DrillConfig config;
+  private QueryContext context;
+
+  public BasicOptimizer(DrillConfig config, QueryContext context){
+    this.config = config;
+    this.context = context;
+  }
+
+  @Override
+  public void init(DrillConfig config) {
+
+  }
+
+  @Override
+  public PhysicalPlan optimize(OptimizationContext context, LogicalPlan plan) {
+    Object obj = new Object();
+    Collection<SinkOperator> roots = plan.getGraph().getRoots();
+    List<PhysicalOperator> physOps = new ArrayList<PhysicalOperator>(roots.size());
+    LogicalConverter converter = new LogicalConverter(plan);
+    for ( SinkOperator op : roots){
+      try {
+        PhysicalOperator pop  = op.accept(converter, obj);
+        System.out.println(pop);
+        physOps.add(pop);
+      } catch (OptimizerException e) {
+        e.printStackTrace();
+      } catch (Throwable throwable) {
+        throwable.printStackTrace();
+      }
     }
 
-    @Override
-    public void init(DrillConfig config) {
+    PlanProperties props = new PlanProperties();
+    props.type = PlanProperties.PlanType.APACHE_DRILL_PHYSICAL;
+    props.version = plan.getProperties().version;
+    props.generator = plan.getProperties().generator;
+    PhysicalPlan p = new PhysicalPlan(props, physOps);
+    return p;
+    //return new PhysicalPlan(props, physOps);
+  }
 
-    }
+  @Override
+  public void close() {
 
-    @Override
-    public PhysicalPlan optimize(OptimizationContext context, LogicalPlan plan) {
-        Object obj = new Object();
-        Collection<SinkOperator> roots = plan.getGraph().getRoots();
-        List<PhysicalOperator> physOps = new ArrayList<PhysicalOperator>(roots.size());
-        LogicalConverter converter = new LogicalConverter();
-        for ( SinkOperator op : roots){
-            try {
-                PhysicalOperator pop  = op.accept(converter, obj);
-                System.out.println(pop);
-                physOps.add(pop);
-            } catch (OptimizerException e) {
-                e.printStackTrace();
-            } catch (Throwable throwable) {
-                throwable.printStackTrace();
-            }
-        }
+  }
 
-        PlanProperties props = new PlanProperties();
-        props.type = PlanProperties.PlanType.APACHE_DRILL_PHYSICAL;
-        props.version = plan.getProperties().version;
-        props.generator = plan.getProperties().generator;
-        return new PhysicalPlan(props, physOps);
-    }
+  public static class BasicOptimizationContext implements OptimizationContext {
 
     @Override
-    public void close() {
-
+    public int getPriority() {
+      return 1;
     }
+  }
 
-    public static class BasicOptimizationContext implements OptimizationContext {
+  private class LogicalConverter extends AbstractLogicalVisitor<PhysicalOperator, Object, OptimizerException> {
 
-        @Override
-        public int getPriority() {
-            return 1;
-        }
+    // storing a reference to the plan for access to other elements outside of the query graph
+    // such as the storage engine configs
+    LogicalPlan logicalPlan;
+
+    public LogicalConverter(LogicalPlan logicalPlan){
+      this.logicalPlan = logicalPlan;
     }
 
 
-    private class LogicalConverter extends AbstractLogicalVisitor<PhysicalOperator, Object, OptimizerException> {
-
-        @Override
-        public MockScanPOP visitScan(Scan scan, Object obj) throws OptimizerException {
-            List<MockScanPOP.MockScanEntry> myObjects;
-
-            try {
-                if ( scan.getStorageEngine().equals("local-logs")){
-                    myObjects = scan.getSelection().getListWith(config,
-                            new TypeReference<ArrayList<MockScanPOP.MockScanEntry>>() {
-                    });
-                }
-                else{
-                    myObjects = new ArrayList<>();
-                    MockScanPOP.MockColumn[] cols = {
-                        new MockScanPOP.MockColumn("RED", MinorType.BIGINT, DataMode.REQUIRED, null, null, null),
-                        new MockScanPOP.MockColumn("GREEN", MinorType.BIGINT, DataMode.REQUIRED,null, null, null)
-                    };
-                    myObjects.add(new MockScanPOP.MockScanEntry(100, cols));
-                }
-            } catch (IOException e) {
-                e.printStackTrace();
-                throw new OptimizerException("Error reading selection attribute of Scan node in Logical to Physical plan conversion.");
-            }
-
-            return new MockScanPOP("http://apache.org", myObjects);
-        }
+    @Override
+    public PhysicalOperator visitScan(Scan scan, Object obj) throws OptimizerException {
+      List<MockGroupScanPOP.MockScanEntry> myObjects;
 
-        @Override
-        public Screen visitStore(Store store, Object obj) throws OptimizerException {
-            if ( ! store.iterator().hasNext()){
-                throw new OptimizerException("Store node in logical plan does not have a child.");
-            }
-            return new Screen(store.iterator().next().accept(this, obj), context.getCurrentEndpoint());
+      try {
+        if (scan.getStorageEngine().equals("parquet")) {
+          return context.getStorageEngine(logicalPlan.getStorageEngineConfig(scan.getStorageEngine())).getPhysicalScan(scan);
         }
-
-        @Override
-        public PhysicalOperator visitProject(Project project, Object obj) throws OptimizerException {
-          return project.getInput().accept(this, obj);
-//            return new org.apache.drill.exec.physical.config.Project(
-//                Arrays.asList(project.getSelections()), project.iterator().next().accept(this, obj));
+        if (scan.getStorageEngine().equals("local-logs")) {
+          myObjects = scan.getSelection().getListWith(config,
+              new TypeReference<ArrayList<MockGroupScanPOP.MockScanEntry>>() {
+              });
+        } else {
+          myObjects = new ArrayList<>();
+          MockGroupScanPOP.MockColumn[] cols = {
+              new MockGroupScanPOP.MockColumn("blah", MinorType.INT, DataMode.REQUIRED, 4, 4, 4),
+              new MockGroupScanPOP.MockColumn("blah_2", MinorType.INT, DataMode.REQUIRED, 4, 4, 4) };
+          myObjects.add(new MockGroupScanPOP.MockScanEntry(50, cols));
         }
+      } catch (IOException e) {
+        throw new OptimizerException(
+            "Error reading selection attribute of GroupScan node in Logical to Physical plan conversion.", e);
+      } catch (SetupException e) {
+        throw new OptimizerException(
+            "Storage engine not found: " + scan.getStorageEngine(), e);
+      }
+
+      return new MockGroupScanPOP("http://apache.org", myObjects);
+    }
 
-      @Override
-      public PhysicalOperator visitFilter(Filter filter, Object obj) throws OptimizerException {
-        TypeProtos.MajorType.Builder b = TypeProtos.MajorType.getDefaultInstance().newBuilderForType();
-        b.setMode(DataMode.REQUIRED);
-        b.setMinorType(MinorType.BIGINT);
-
-        return new SelectionVectorRemover(new org.apache.drill.exec.physical.config.Filter(
-            filter.iterator().next().accept(this, obj), /*filter.getExpr() */
-            new FunctionCall(FunctionDefinition.simple("alternate", new NoArgValidator(),
-                new OutputTypeDeterminer.FixedType(b.build())), null, new ExpressionPosition("asdf", 1)),
-            1.0f));
+    @Override
+    public PhysicalOperator visitStore(Store store, Object obj) throws OptimizerException {
+      if (!store.iterator().hasNext()) {
+        throw new OptimizerException("Store node in logical plan does not have a child.");
       }
+      return new Screen(store.iterator().next().accept(this, obj), context.getCurrentEndpoint());
+    }
+
+    @Override
+    public PhysicalOperator visitProject(Project project, Object obj) throws OptimizerException {
+      return project.getInput().accept(this, obj);
+      // return new org.apache.drill.exec.physical.config.Project(
+      // Arrays.asList(project.getSelections()), project.iterator().next().accept(this, obj));
+    }
+
+    @Override
+    public PhysicalOperator visitFilter(Filter filter, Object obj) throws OptimizerException {
+      TypeProtos.MajorType.Builder b = TypeProtos.MajorType.getDefaultInstance().newBuilderForType();
+      b.setMode(DataMode.REQUIRED);
+      b.setMinorType(MinorType.BIGINT);
+
+      return new SelectionVectorRemover(new org.apache.drill.exec.physical.config.Filter(filter.iterator().next()
+          .accept(this, obj), /* filter.getExpr() */
+      new FunctionCall(FunctionDefinition.simple("alternate", new NoArgValidator(), new OutputTypeDeterminer.FixedType(
+          b.build())), null, new ExpressionPosition("asdf", 1)), 1.0f));
     }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntryFromHDFS.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntryFromHDFS.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntryFromHDFS.java
new file mode 100644
index 0000000..b90f1bc
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntryFromHDFS.java
@@ -0,0 +1,54 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.exec.physical.base.Size;
+
+public class ReadEntryFromHDFS extends ReadEntryWithPath {
+
+  private long start;
+  private long length;
+
+  @JsonCreator
+  public ReadEntryFromHDFS(@JsonProperty("path") String path,@JsonProperty("start") long start, @JsonProperty("length") long length) {
+    this.path = path;
+    this.start = start;
+    this.length = length;
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return new OperatorCost(1, 2, 1, 1);
+  }
+
+  @Override
+  public Size getSize() {
+    // TODO - these values are wrong, I cannot know these until after I read a file
+    return new Size(10, 10);
+  }
+
+  public long getStart() {
+    return start;
+  }
+
+  public long getLength() {
+    return length;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntryWithPath.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntryWithPath.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntryWithPath.java
new file mode 100644
index 0000000..57d1d0b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntryWithPath.java
@@ -0,0 +1,41 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical;
+
+import org.apache.drill.exec.physical.base.Size;
+
+public class ReadEntryWithPath implements ReadEntry {
+
+  protected String path;
+
+  public String getPath(){
+   return path;
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    throw new UnsupportedOperationException(this.getClass().getCanonicalName() + " is only for extracting path data from " +
+        "selections inside a scan node from a logical plan, it cannot be used in an executing plan and has no cost.");
+  }
+
+  @Override
+  public Size getSize() {
+    throw new UnsupportedOperationException(this.getClass().getCanonicalName() + " is only for extracting path data from " +
+        "selections on a scan node from a logical plan, it cannot be used in an executing plan and has no size.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
new file mode 100644
index 0000000..9691f08
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import java.util.Iterator;
+
+import org.apache.drill.exec.physical.ReadEntry;
+
+import com.google.common.collect.Iterators;
+
+public abstract class AbstractGroupScan extends AbstractBase implements GroupScan {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractGroupScan.class);
+
+  
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Iterators.emptyIterator();
+  }
+
+  @Override
+  public boolean isExecutable() {
+    return false;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
+    return physicalVisitor.visitGroupScan(this, value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index f782325..3b58803 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -62,8 +62,13 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
   }
 
   @Override
-  public T visitScan(Scan<?> scan, X value) throws E{
-    return visitOp(scan, value);
+  public T visitGroupScan(GroupScan groupScan, X value) throws E{
+    return visitOp(groupScan, value);
+  }
+
+  @Override
+  public T visitSubScan(SubScan subScan, X value) throws E{
+    return visitOp(subScan, value);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractScan.java
deleted file mode 100644
index dbde9c5..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractScan.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.physical.base;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.ReadEntry;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Iterators;
-
-public abstract class AbstractScan<R extends ReadEntry> extends AbstractBase implements Scan<R>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractScan.class);
-  
-  protected final List<R> readEntries;
-  private final OperatorCost cost;
-  private final Size size;
-  
-  public AbstractScan(List<R> readEntries) {
-    this.readEntries = readEntries;
-    OperatorCost cost = new OperatorCost(0,0,0,0);
-    Size size = new Size(0,0);
-    for(R r : readEntries){
-      cost = cost.add(r.getCost());
-      size = size.add(r.getSize());
-    }
-    this.cost = cost;
-    this.size = size;
-  }
-
-  @Override
-  @JsonProperty("entries")
-  public List<R> getReadEntries() {
-    return readEntries;
-  }
-  
-  @Override
-  public Iterator<PhysicalOperator> iterator() {
-    return Iterators.emptyIterator();
-  }
-
-  @Override
-  public boolean isExecutable() {
-    return true;
-  }
-
-  @Override
-  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
-    return physicalVisitor.visitScan(this, value);
-  }
-
-  @Override
-  public OperatorCost getCost() {
-    return cost;
-  }
-
-  @Override
-  public Size getSize() {
-    return size;
-  }
-  
-  
-  
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
new file mode 100644
index 0000000..acafd6c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public interface GroupScan extends Scan, HasAffinity{
+
+  public abstract void applyAssignments(List<DrillbitEndpoint> endpoints);
+
+  public abstract SubScan getSpecificScan(int minorFragmentId);
+
+  @JsonIgnore
+  public int getMaxParallelizationWidth();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Leaf.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Leaf.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Leaf.java
index d4ed456..7764739 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Leaf.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Leaf.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.base;
 
 /**
  * An operator which specifically is a lowest level leaf node of a query plan across all possible fragments. Currently, the only operator that is a Leaf
- * node are Scan nodes. Ultimately this could include use of Cache scans and other types of atypical data production systems.
+ * node are GroupScan nodes. Ultimately this could include use of Cache scans and other types of atypical data production systems.
  */
 public interface Leaf extends FragmentLeaf {
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index d412c2d..c24836b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -34,7 +34,7 @@ import com.fasterxml.jackson.annotation.ObjectIdGenerators;
 @JsonPropertyOrder({ "@id" })
 @JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id")
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "pop")
-public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
+public interface  PhysicalOperator extends GraphValue<PhysicalOperator> {
 
   /**
    * Get the cost of execution of this particular operator.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index f36633f..8e09e3a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -39,7 +39,8 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
   
   
   public RETURN visitExchange(Exchange exchange, EXTRA value) throws EXCEP;
-  public RETURN visitScan(Scan<?> scan, EXTRA value) throws EXCEP;
+  public RETURN visitGroupScan(GroupScan groupScan, EXTRA value) throws EXCEP;
+  public RETURN visitSubScan(SubScan subScan, EXTRA value) throws EXCEP;
   public RETURN visitStore(Store store, EXTRA value) throws EXCEP;
 
   public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Scan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Scan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Scan.java
index 2207f79..f56e9f9 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Scan.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Scan.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,20 +17,7 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.base;
 
-import java.util.List;
-
 import org.apache.drill.exec.physical.ReadEntry;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-public interface Scan<R extends ReadEntry> extends Leaf, HasAffinity{
-
-  @JsonProperty("entries")
-  public abstract List<R> getReadEntries();
-
-  public abstract void applyAssignments(List<DrillbitEndpoint> endpoints);
-
-  public abstract Scan<?> getSpecificScan(int minorFragmentId);
 
-}
\ No newline at end of file
+public interface Scan extends Leaf {
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java
new file mode 100644
index 0000000..f75ba19
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java
@@ -0,0 +1,23 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import org.apache.drill.exec.physical.ReadEntry;
+
+public interface SubScan extends Scan {
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockGroupScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockGroupScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockGroupScanPOP.java
new file mode 100644
index 0000000..a28c7d8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockGroupScanPOP.java
@@ -0,0 +1,221 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.vector.TypeHelper;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("mock-scan")
+public class MockGroupScanPOP extends AbstractGroupScan {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockGroupScanPOP.class);
+
+  private final String url;
+  protected final List<MockScanEntry> readEntries;
+  private final OperatorCost cost;
+  private final Size size;
+  private  LinkedList<MockScanEntry>[] mappings;
+
+  @JsonCreator
+  public MockGroupScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockScanEntry> readEntries) {
+    this.readEntries = readEntries;
+    OperatorCost cost = new OperatorCost(0,0,0,0);
+    Size size = new Size(0,0);
+    for(MockScanEntry r : readEntries){
+      cost = cost.add(r.getCost());
+      size = size.add(r.getSize());
+    }
+    this.cost = cost;
+    this.size = size;
+    this.url = url;
+  }
+
+  public String getUrl() {
+    return url;
+  }
+
+  @JsonProperty("entries")
+  public List<MockScanEntry> getReadEntries() {
+    return readEntries;
+  }
+  
+  public static class MockScanEntry implements ReadEntry {
+
+    private final int records;
+    private final MockColumn[] types;
+    private final int recordSize;
+    
+
+    @JsonCreator
+    public MockScanEntry(@JsonProperty("records") int records, @JsonProperty("types") MockColumn[] types) {
+      this.records = records;
+      this.types = types;
+      int size = 0;
+      for(MockColumn dt : types){
+        size += TypeHelper.getSize(dt.getMajorType());
+      }
+      this.recordSize = size;
+    }
+
+    @Override
+    public OperatorCost getCost() {
+      return new OperatorCost(1, 2, 1, 1);
+    }
+    
+    public int getRecords() {
+      return records;
+    }
+
+    public MockColumn[] getTypes() {
+      return types;
+    }
+
+    @Override
+    public Size getSize() {
+      return new Size(records, recordSize);
+    }
+  }
+  
+  @JsonInclude(Include.NON_NULL)
+  public static class MockColumn{
+    @JsonProperty("type") public MinorType minorType;
+    public String name;
+    public DataMode mode;
+    public Integer width;
+    public Integer precision;
+    public Integer scale;
+    
+    
+    @JsonCreator
+    public MockColumn(@JsonProperty("name") String name, @JsonProperty("type") MinorType minorType, @JsonProperty("mode") DataMode mode, @JsonProperty("width") Integer width, @JsonProperty("precision") Integer precision, @JsonProperty("scale") Integer scale) {
+      this.name = name;
+      this.minorType = minorType;
+      this.mode = mode;
+      this.width = width;
+      this.precision = precision;
+      this.scale = scale;
+    }
+    
+    @JsonProperty("type")
+    public MinorType getMinorType() {
+      return minorType;
+    }
+    public String getName() {
+      return name;
+    }
+    public DataMode getMode() {
+      return mode;
+    }
+    public Integer getWidth() {
+      return width;
+    }
+    public Integer getPrecision() {
+      return precision;
+    }
+    public Integer getScale() {
+      return scale;
+    }
+    
+    @JsonIgnore
+    public MajorType getMajorType(){
+      MajorType.Builder b = MajorType.newBuilder();
+      b.setMode(mode);
+      b.setMinorType(minorType);
+      if(precision != null) b.setPrecision(precision);
+      if(width != null) b.setWidth(width);
+      if(scale != null) b.setScale(scale);
+      return b.build();
+    }
+    
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    return Collections.emptyList();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
+    Preconditions.checkArgument(endpoints.size() <= getReadEntries().size());
+    
+    mappings = new LinkedList[endpoints.size()];
+
+    int i =0;
+    for(MockScanEntry e : this.getReadEntries()){
+      if(i == endpoints.size()) i -= endpoints.size();
+      LinkedList<MockScanEntry> entries = mappings[i];
+      if(entries == null){
+        entries = new LinkedList<MockScanEntry>();
+        mappings[i] = entries;
+      }
+      entries.add(e);
+      i++;
+    }
+  }
+
+  @Override
+  public SubScan getSpecificScan(int minorFragmentId) {
+    assert minorFragmentId < mappings.length : String.format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.length, minorFragmentId);
+    return new MockSubScanPOP(url, mappings[minorFragmentId]);
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return readEntries.size();
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return cost;
+  }
+
+  @Override
+  public Size getSize() {
+    return size;
+  }
+
+  @Override
+  @JsonIgnore
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new MockGroupScanPOP(url, readEntries);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
index 11b9243..bd57823 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
@@ -23,8 +23,8 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.MockScanPOP.MockColumn;
-import org.apache.drill.exec.physical.config.MockScanPOP.MockScanEntry;
+import org.apache.drill.exec.physical.config.MockGroupScanPOP.MockColumn;
+import org.apache.drill.exec.physical.config.MockGroupScanPOP.MockScanEntry;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.RecordReader;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java
index bfc19af..a06aaee 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java
@@ -21,7 +21,7 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.MockScanPOP.MockScanEntry;
+import org.apache.drill.exec.physical.config.MockGroupScanPOP.MockScanEntry;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
@@ -30,11 +30,11 @@ import org.apache.drill.exec.store.RecordReader;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-public class MockScanBatchCreator implements BatchCreator<MockScanPOP>{
+public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
 
   @Override
-  public RecordBatch getBatch(FragmentContext context, MockScanPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+  public RecordBatch getBatch(FragmentContext context, MockSubScanPOP config, List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     List<MockScanEntry> entries = config.getReadEntries();
     List<RecordReader> readers = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java
deleted file mode 100644
index 151d541..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.physical.config;
-
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.ReadEntry;
-import org.apache.drill.exec.physical.base.AbstractScan;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.Scan;
-import org.apache.drill.exec.physical.base.Size;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.vector.TypeHelper;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-
-@JsonTypeName("mock-scan")
-public class MockScanPOP extends AbstractScan<MockScanPOP.MockScanEntry> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanPOP.class);
-
-  private final String url;
-  private  LinkedList<MockScanEntry>[] mappings;
-
-  @JsonCreator
-  public MockScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockScanEntry> readEntries) {
-    super(readEntries);
-    this.url = url;
-  }
-
-  public String getUrl() {
-    return url;
-  }
-
-  
-  public static class MockScanEntry implements ReadEntry {
-
-    private final int records;
-    private final MockColumn[] types;
-    private final int recordSize;
-    
-
-    @JsonCreator
-    public MockScanEntry(@JsonProperty("records") int records, @JsonProperty("types") MockColumn[] types) {
-      this.records = records;
-      this.types = types;
-      int size = 0;
-      for(MockColumn dt : types){
-        size += TypeHelper.getSize(dt.getMajorType());
-      }
-      this.recordSize = size;
-    }
-
-    @Override
-    public OperatorCost getCost() {
-      return new OperatorCost(1, 2, 1, 1);
-    }
-
-    
-    public int getRecords() {
-      return records;
-    }
-
-    public MockColumn[] getTypes() {
-      return types;
-    }
-
-    @Override
-    public Size getSize() {
-      return new Size(records, recordSize);
-    }
-  }
-  
-  @JsonInclude(Include.NON_NULL)
-  public static class MockColumn{
-    @JsonProperty("type") public MinorType minorType;
-    public String name;
-    public DataMode mode;
-    public Integer width;
-    public Integer precision;
-    public Integer scale;
-    
-    
-    @JsonCreator
-    public MockColumn(@JsonProperty("name") String name, @JsonProperty("type") MinorType minorType, @JsonProperty("mode") DataMode mode, @JsonProperty("width") Integer width, @JsonProperty("precision") Integer precision, @JsonProperty("scale") Integer scale) {
-      this.name = name;
-      this.minorType = minorType;
-      this.mode = mode;
-      this.width = width;
-      this.precision = precision;
-      this.scale = scale;
-    }
-    
-    @JsonProperty("type")
-    public MinorType getMinorType() {
-      return minorType;
-    }
-    public String getName() {
-      return name;
-    }
-    public DataMode getMode() {
-      return mode;
-    }
-    public Integer getWidth() {
-      return width;
-    }
-    public Integer getPrecision() {
-      return precision;
-    }
-    public Integer getScale() {
-      return scale;
-    }
-    
-    @JsonIgnore
-    public MajorType getMajorType(){
-      MajorType.Builder b = MajorType.newBuilder();
-      b.setMode(mode);
-      b.setMinorType(minorType);
-      if(precision != null) b.setPrecision(precision);
-      if(width != null) b.setWidth(width);
-      if(scale != null) b.setScale(scale);
-      return b.build();
-    }
-    
-  }
-
-  @Override
-  public List<EndpointAffinity> getOperatorAffinity() {
-    return Collections.emptyList();
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
-    Preconditions.checkArgument(endpoints.size() <= getReadEntries().size());
-    
-    mappings = new LinkedList[endpoints.size()];
-
-    int i =0;
-    for(MockScanEntry e : this.getReadEntries()){
-      if(i == endpoints.size()) i -= endpoints.size();
-      LinkedList<MockScanEntry> entries = mappings[i];
-      if(entries == null){
-        entries = new LinkedList<MockScanEntry>();
-        mappings[i] = entries;
-      }
-      entries.add(e);
-      i++;
-    }
-  }
-
-  @Override
-  public Scan<?> getSpecificScan(int minorFragmentId) {
-    assert minorFragmentId < mappings.length : String.format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.length, minorFragmentId);
-    return new MockScanPOP(url, mappings[minorFragmentId]);
-  }
-
-  @Override
-  @JsonIgnore
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
-    Preconditions.checkArgument(children.isEmpty());
-    return new MockScanPOP(url, readEntries);
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
index 0044628..6348686 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
@@ -22,11 +22,10 @@ import java.util.Collection;
 
 import org.apache.drill.common.logical.data.Scan;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.ReadEntry;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.AbstractStorageEngine;
 import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.StorageEngine;
-import org.apache.drill.exec.store.StorageEngine.ReadEntry;
 
 import com.google.common.collect.ListMultimap;
 
@@ -39,11 +38,6 @@ public class MockStorageEngine extends AbstractStorageEngine{
   }
 
   @Override
-  public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException {
-    return null;
-  }
-
-  @Override
   public ListMultimap<ReadEntry, DrillbitEndpoint> getReadLocations(Collection<ReadEntry> entries) {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockSubScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockSubScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockSubScanPOP.java
new file mode 100644
index 0000000..7380617
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockSubScanPOP.java
@@ -0,0 +1,115 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import com.google.common.collect.Iterators;
+import org.apache.drill.common.graph.GraphVisitor;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.vector.TypeHelper;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("mock-sub-scan")
+public class MockSubScanPOP extends AbstractBase implements SubScan {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockGroupScanPOP.class);
+
+  private final String url;
+  protected final List<MockGroupScanPOP.MockScanEntry> readEntries;
+  private final OperatorCost cost;
+  private final Size size;
+  private  LinkedList<MockGroupScanPOP.MockScanEntry>[] mappings;
+
+  @JsonCreator
+  public MockSubScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockGroupScanPOP.MockScanEntry> readEntries) {
+    this.readEntries = readEntries;
+    OperatorCost cost = new OperatorCost(0,0,0,0);
+    Size size = new Size(0,0);
+    for(MockGroupScanPOP.MockScanEntry r : readEntries){
+      cost = cost.add(r.getCost());
+      size = size.add(r.getSize());
+    }
+    this.cost = cost;
+    this.size = size;
+    this.url = url;
+  }
+
+  public String getUrl() {
+    return url;
+  }
+
+  @JsonProperty("entries")
+  public List<MockGroupScanPOP.MockScanEntry> getReadEntries() {
+    return readEntries;
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Iterators.emptyIterator();
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Size getSize() {
+    throw new UnsupportedOperationException();
+  }
+
+  // will want to replace these two methods with an interface above for AbstractSubScan
+  @Override
+  public boolean isExecutable() {
+    return true;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
+    return physicalVisitor.visitSubScan(this, value);
+  }
+  // see comment above about replacing this
+
+  @Override
+  @JsonIgnore
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new MockSubScanPOP(url, readEntries);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 1c15289..61c9383 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,6 +26,15 @@ import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Scan;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.MockScanBatchCreator;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.config.RandomReceiver;
+import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.SelectionVectorRemover;
+import org.apache.drill.exec.physical.config.SingleSender;
+import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.base.*;
 import org.apache.drill.exec.physical.config.*;
 import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
 import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
@@ -35,11 +44,15 @@ import org.apache.drill.exec.record.RecordBatch;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import org.apache.drill.exec.store.parquet.ParquetGroupScan;
+import org.apache.drill.exec.store.parquet.ParquetRowGroupScan;
+import org.apache.drill.exec.store.parquet.ParquetScanBatchCreator;
 
 public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentContext, ExecutionSetupException>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImplCreator.class);
 
   private MockScanBatchCreator msc = new MockScanBatchCreator();
+  private ParquetScanBatchCreator parquetScan = new ParquetScanBatchCreator();
   private ScreenCreator sc = new ScreenCreator();
   private RandomReceiverCreator rrc = new RandomReceiverCreator();
   private SingleSenderCreator ssc = new SingleSenderCreator();
@@ -48,22 +61,25 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
   private SVRemoverCreator svc = new SVRemoverCreator();
   private SortBatchCreator sbc = new SortBatchCreator();
   private RootExec root = null;
-  
+
   private ImplCreator(){}
-  
+
   public RootExec getRoot(){
     return root;
   }
-  
+
   @Override
   public RecordBatch visitProject(Project op, FragmentContext context) throws ExecutionSetupException {
     return pbc.getBatch(context, op, getChildren(op, context));
   }
 
   @Override
-  public RecordBatch visitScan(Scan<?> scan, FragmentContext context) throws ExecutionSetupException {
-    Preconditions.checkNotNull(scan);
+  public RecordBatch visitSubScan(SubScan subScan, FragmentContext context) throws ExecutionSetupException {
+    Preconditions.checkNotNull(subScan);
     Preconditions.checkNotNull(context);
+
+    if(subScan instanceof MockSubScanPOP){
+      return msc.getBatch(context, (MockSubScanPOP) subScan, Collections.<RecordBatch> emptyList());
     
     if(scan instanceof MockScanPOP){
       return msc.getBatch(context, (MockScanPOP) scan, Collections.<RecordBatch>emptyList());
@@ -72,16 +88,21 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
     }else{
       return super.visitScan(scan, context);  
     }
-    
+    else if (subScan instanceof ParquetRowGroupScan){
+      return parquetScan.getBatch(context, (ParquetRowGroupScan) subScan,  Collections.<RecordBatch> emptyList());
+    }
+    else{
+      return super.visitSubScan(subScan, context);
+    }
+
   }
 
-  
   @Override
   public RecordBatch visitOp(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException {
     if(op instanceof SelectionVectorRemover){
       return svc.getBatch(context, (SelectionVectorRemover) op, getChildren(op, context));
     }else{
-      return super.visitOp(op, context);  
+      return super.visitOp(op, context);
     }
   }
 
@@ -97,7 +118,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
     root = sc.getRoot(context, op, getChildren(op, context));
     return null;
   }
-  
+
   @Override
   public RecordBatch visitFilter(Filter filter, FragmentContext context) throws ExecutionSetupException {
     return fbc.getBatch(context, filter, getChildren(filter, context));
@@ -121,7 +142,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
     }
     return children;
   }
-  
+
   public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException{
     ImplCreator i = new ImplCreator();
     root.accept(i, context);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
index 7e72683..3e9f1e2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
@@ -24,5 +24,6 @@ import org.apache.drill.exec.vector.ValueVector;
 public interface OutputMutator {
   public void removeField(MaterializedField field) throws SchemaChangeException;
   public void addField(ValueVector vector) throws SchemaChangeException ;
+  public void removeAllFields();
   public void setNewSchema() throws SchemaChangeException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 5a543b0..4227450 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -161,6 +161,12 @@ public class ScanBatch implements RecordBatch {
     }
 
     @Override
+    public void removeAllFields() {
+      holder.clear();
+      fieldVectorMap.clear();
+    }
+
+    @Override
     public void setNewSchema() throws SchemaChangeException {
       ScanBatch.this.schema = this.builder.build();
       ScanBatch.this.schemaChanged = true;