You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/04/14 04:35:13 UTC

[9/9] git commit: basic framework for physical plan. abstraction of graph classes.

basic framework for physical plan.  abstraction of graph classes.

Initial Work on Java Exec

WIP commit of java-exec


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b53933f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b53933f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b53933f2

Branch: refs/heads/execwork
Commit: b53933f225e21b890a9cc25545be7ce4223ba0ce
Parents: 2a6e1b3
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sat Mar 16 17:56:35 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat Apr 13 03:01:00 2013 -0700

----------------------------------------------------------------------
 .../apache/drill/common/config/DrillConfig.java    |   51 +-
 .../exceptions/DrillConfigurationException.java    |   43 +
 .../drill/common/exceptions/DrillIOException.java  |   42 +
 .../drill/common/expression/types/DataType.java    |    2 +
 .../apache/drill/common/optimize/Optimizer.java    |   45 +
 .../drill/common/physical/DataValidationMode.java  |   24 +
 .../org/apache/drill/common/physical/FieldSet.java |   23 +-
 .../apache/drill/common/physical/RecordField.java  |   43 +-
 .../apache/drill/common/physical/ParsePlan.java    |   36 +
 .../common/src/test/resources/basic_physical.json  |   42 +
 .../common/src/test/resources/dsort-physical.json  |   76 --
 .../common/src/test/resources/dsort_logical.json   |   40 +
 .../common/src/test/resources/dsort_physical.json  |   72 ++
 .../common/src/test/resources/simple_plan.json     |  133 +++
 sandbox/prototype/exec/java-exec/pom.xml           |  165 +++-
 .../prototype/exec/java-exec/rse/ClasspathRSE.java |   88 ++
 .../prototype/exec/java-exec/rse/ConsoleRSE.java   |   60 +
 .../exec/java-exec/rse/FileSystemRSE.java          |  144 +++
 .../exec/java-exec/rse/JSONDataWriter.java         |  142 +++
 .../exec/java-exec/rse/JSONRecordReader.java       |  183 ++++
 .../exec/java-exec/rse/OutputStreamWriter.java     |   78 ++
 sandbox/prototype/exec/java-exec/rse/QueueRSE.java |  100 ++
 sandbox/prototype/exec/java-exec/rse/RSEBase.java  |   71 ++
 .../prototype/exec/java-exec/rse/RSERegistry.java  |   85 ++
 .../prototype/exec/java-exec/rse/RecordReader.java |   28 +
 .../exec/java-exec/rse/RecordRecorder.java         |   32 +
 .../exec/java-exec/rse/ReferenceStorageEngine.java |   45 +
 .../org/apache/drill/exec/BufferAllocator.java     |   52 +
 .../apache/drill/exec/DirectBufferAllocator.java   |   47 +
 .../java/org/apache/drill/exec/ExecConstants.java  |   31 +
 .../apache/drill/exec/cache/DistributedCache.java  |   38 +
 .../org/apache/drill/exec/cache/HazelCache.java    |  133 +++
 .../drill/exec/cache/TemplatizedLogicalPlan.java   |   22 +
 .../drill/exec/cache/TemplatizedPhysicalPlan.java  |   22 +
 .../drill/exec/coord/ClusterCoordinator.java       |   47 +
 .../exec/coord/DrillServiceInstanceHelper.java     |   57 +
 .../drill/exec/coord/ZKClusterCoordinator.java     |  145 +++
 .../drill/exec/coord/ZKRegistrationHandle.java     |   32 +
 .../java/org/apache/drill/exec/disk/Spool.java     |   29 +
 .../exec/exception/DrillbitStartupException.java   |   46 +
 .../exec/exception/ExecutionSetupException.java    |   45 +
 .../exec/exception/SchemaChangeException.java      |   52 +
 .../drill/exec/exception/SetupException.java       |   46 +
 .../org/apache/drill/exec/ops/BatchIterator.java   |   32 +
 .../org/apache/drill/exec/ops/FragmentContext.java |   49 +
 .../org/apache/drill/exec/ops/OutputMutator.java   |   28 +
 .../org/apache/drill/exec/ops/QueryOutcome.java    |   22 +
 .../java/org/apache/drill/exec/ops/ScanBatch.java  |  157 +++
 .../exec/ops/exchange/PartitioningSender.java      |   23 +
 .../drill/exec/ops/exchange/RandomReceiver.java    |   24 +
 .../drill/exec/ops/exchange/RecordBatchSender.java |   24 +
 .../apache/drill/exec/opt/IdentityOptimizer.java   |   40 +
 .../org/apache/drill/exec/planner/ExecPlanner.java |   27 +
 .../org/apache/drill/exec/record/BatchSchema.java  |  123 +++
 .../java/org/apache/drill/exec/record/DeadBuf.java |  848 +++++++++++++++
 .../drill/exec/record/InvalidValueAccessor.java    |   46 +
 .../drill/exec/record/MaterializedField.java       |   79 ++
 .../org/apache/drill/exec/record/RecordBatch.java  |   85 ++
 .../org/apache/drill/exec/record/RecordMaker.java  |   22 +
 .../record/vector/AbstractFixedValueVector.java    |   60 +
 .../apache/drill/exec/record/vector/AnyVector.java |   30 +
 .../drill/exec/record/vector/BaseValueVector.java  |  104 ++
 .../apache/drill/exec/record/vector/BitUtil.java   |  108 ++
 .../apache/drill/exec/record/vector/BitVector.java |  118 ++
 .../apache/drill/exec/record/vector/BufBitSet.java |  847 ++++++++++++++
 .../drill/exec/record/vector/ByteVector.java       |   48 +
 .../drill/exec/record/vector/Int32Vector.java      |   52 +
 .../exec/record/vector/NullableValueVector.java    |   70 ++
 .../drill/exec/record/vector/ValueVector.java      |   81 ++
 .../drill/exec/record/vector/VariableVector.java   |   78 ++
 .../org/apache/drill/exec/rpc/BasicClient.java     |   81 ++
 .../org/apache/drill/exec/rpc/BasicServer.java     |  106 ++
 .../drill/exec/rpc/ChannelClosedException.java     |   39 +
 .../apache/drill/exec/rpc/CoordinationQueue.java   |   87 ++
 .../org/apache/drill/exec/rpc/DrillRpcFuture.java  |   92 ++
 .../apache/drill/exec/rpc/InboundRpcMessage.java   |   50 +
 .../apache/drill/exec/rpc/NamedThreadFactory.java  |   48 +
 .../apache/drill/exec/rpc/OutboundRpcMessage.java  |   50 +
 .../drill/exec/rpc/PositiveAtomicInteger.java      |   39 +
 .../apache/drill/exec/rpc/RemoteRpcException.java  |   38 +
 .../java/org/apache/drill/exec/rpc/Response.java   |   41 +
 .../java/org/apache/drill/exec/rpc/RpcBus.java     |  172 +++
 .../org/apache/drill/exec/rpc/RpcConstants.java    |   26 +
 .../java/org/apache/drill/exec/rpc/RpcDecoder.java |  142 +++
 .../java/org/apache/drill/exec/rpc/RpcEncoder.java |  127 +++
 .../org/apache/drill/exec/rpc/RpcException.java    |   45 +
 .../apache/drill/exec/rpc/RpcExceptionHandler.java |   52 +
 .../java/org/apache/drill/exec/rpc/RpcMessage.java |   45 +
 .../exec/rpc/ZeroCopyProtobufLengthDecoder.java    |   80 ++
 .../org/apache/drill/exec/rpc/bit/BitClient.java   |   62 ++
 .../java/org/apache/drill/exec/rpc/bit/BitCom.java |   69 ++
 .../apache/drill/exec/rpc/bit/BitComHandler.java   |  136 +++
 .../org/apache/drill/exec/rpc/bit/BitComImpl.java  |  142 +++
 .../org/apache/drill/exec/rpc/bit/BitServer.java   |   64 ++
 .../org/apache/drill/exec/rpc/bit/BitTunnel.java   |   63 ++
 .../org/apache/drill/exec/rpc/user/UserClient.java |   72 ++
 .../org/apache/drill/exec/rpc/user/UserServer.java |   90 ++
 .../org/apache/drill/exec/schema/BackedRecord.java |   44 +
 .../org/apache/drill/exec/schema/DataRecord.java   |   56 +
 .../org/apache/drill/exec/schema/DiffSchema.java   |   66 ++
 .../java/org/apache/drill/exec/schema/Field.java   |  135 +++
 .../org/apache/drill/exec/schema/IdGenerator.java  |   13 +
 .../org/apache/drill/exec/schema/ListSchema.java   |  108 ++
 .../org/apache/drill/exec/schema/NamedField.java   |   44 +
 .../org/apache/drill/exec/schema/ObjectSchema.java |   91 ++
 .../org/apache/drill/exec/schema/OrderedField.java |   33 +
 .../java/org/apache/drill/exec/schema/Record.java  |   29 +
 .../org/apache/drill/exec/schema/RecordSchema.java |   29 +
 .../drill/exec/schema/SchemaIdGenerator.java       |   36 +
 .../apache/drill/exec/schema/SchemaRecorder.java   |  122 +++
 .../exec/schema/json/jackson/JacksonHelper.java    |   63 ++
 .../exec/schema/json/jackson/PhysicalOperator.java |   36 +
 .../json/jackson/PhysicalOperatorIterator.java     |   31 +
 .../drill/exec/schema/json/jackson/ScanJson.java   |  203 ++++
 .../transform/ProtobufSchemaTransformer.java       |  109 ++
 .../exec/schema/transform/SchemaTransformer.java   |   30 +
 .../org/apache/drill/exec/server/Drillbit.java     |  116 ++
 .../apache/drill/exec/server/DrillbitContext.java  |   65 ++
 .../apache/drill/exec/server/StartupOptions.java   |   66 ++
 .../apache/drill/exec/service/ServiceEngine.java   |   73 ++
 .../drill/exec/store/QueryOptimizerRule.java       |   21 +
 .../org/apache/drill/exec/store/RecordReader.java  |   49 +
 .../apache/drill/exec/store/RecordRecorder.java    |   36 +
 .../org/apache/drill/exec/store/StorageEngine.java |   92 ++
 .../drill/exec/store/StorageEngineRegistry.java    |   82 ++
 .../java-exec/src/main/protobuf/Coordination.proto |   32 +
 .../src/main/protobuf/ExecutionProtos.proto        |   65 ++
 .../java-exec/src/main/protobuf/GeneralRPC.proto   |   35 +
 .../java-exec/src/main/protobuf/SchemaDef.proto    |   37 +
 .../exec/java-exec/src/main/protobuf/User.proto    |   93 ++
 .../java-exec/src/main/resources/drill-module.conf |   28 +
 .../java-exec/src/test/java/BBOutputStream.java    |   38 +
 .../src/test/java/CompressingBytesColumn.java      |   46 +
 .../exec/java-exec/src/test/java/ExternalSort.java |   21 +
 .../src/test/java/GenerateExternalSortData.java    |  124 +++
 .../drill/exec/record/column/SimpleExec.java       |   30 +
 .../drill/exec/record/vector/TestOpenBitSet.java   |  361 ++++++
 .../apache/drill/exec/rpc/user/UserRpcTest.java    |  107 ++
 .../apache/drill/exec/server/StartDrillbit.java    |   31 +
 .../exec/java-exec/src/test/resources/logback.xml  |   45 +
 sandbox/prototype/exec/ref/pom.xml                 |    7 +-
 141 files changed, 10854 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
index a775867..b738002 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.CopyOnWriteArrayList;
 
+import org.apache.drill.common.exceptions.DrillConfigurationException;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.logical.StorageEngineConfigBase;
 import org.apache.drill.common.logical.data.LogicalOperatorBase;
@@ -59,6 +60,14 @@ public final class DrillConfig extends NestedConfig{
   };
   
   /**
+   * Create a DrillConfig object using the default config file name 
+   * @return The new DrillConfig object.
+   */
+  public static DrillConfig create() {
+    return create(null);
+  }
+  
+  /**
    * <p>
    * DrillConfig loads up Drill configuration information. It does this utilizing a combination of classpath scanning
    * and Configuration fallbacks provided by the TypeSafe configuration library. The order of precedence is as
@@ -68,16 +77,20 @@ public final class DrillConfig extends NestedConfig{
    * Configuration values are retrieved as follows:
    * <ul>
    * <li>Check a single copy of "drill-override.conf". If multiple copies are on the classpath, behavior is
-   * indeterminate.</li>
+   * indeterminate.  If a non-null value for overrideFileName is provided, this is utilized instead of drill-override.conf.</li>
    * <li>Check all copies of "drill-module.conf". Loading order is indeterminate.</li>
    * <li>Check a single copy of "drill-default.conf". If multiple copies are on the classpath, behavior is
    * indeterminate.</li>
    * </ul>
    * 
    * </p>
-   * * @return A merged Config object.
+   *  @param overrideFileName The name of the file to use for override purposes.
+   *  @return A merged Config object.
    */
-  public static DrillConfig create() {
+  public static DrillConfig create(String overrideFileName) {
+    
+    overrideFileName = overrideFileName == null ? CommonConstants.CONFIG_OVERRIDE : overrideFileName;
+    
     // first we load defaults.
     Config fallback = ConfigFactory.load(CommonConstants.CONFIG_DEFAULT);
     Collection<URL> urls = PathScanner.getConfigURLs();
@@ -86,10 +99,40 @@ public final class DrillConfig extends NestedConfig{
       fallback = ConfigFactory.parseURL(url).withFallback(fallback);
     }
 
-    Config c = ConfigFactory.load(CommonConstants.CONFIG_OVERRIDE).withFallback(fallback).resolve();
+    Config c = ConfigFactory.load(overrideFileName).withFallback(fallback).resolve();
     return new DrillConfig(c);
   }
   
+  public <T> Class<T> getClassAt(String location, Class<T> clazz) throws DrillConfigurationException{
+    String className = this.getString(location);
+    if(className == null) throw new DrillConfigurationException(String.format("No class defined at location '%s'.  Expected a definition of the class []", location, clazz.getCanonicalName()));
+    try{
+      Class<?> c = Class.forName(className);
+      if(clazz.isAssignableFrom(c)){
+        @SuppressWarnings("unchecked") Class<T> t = (Class<T>) c;
+        return t;
+      }else{
+        throw new DrillConfigurationException(String.format("The class [%s] listed at location '%s' should be of type [%s].  It isn't.", className, location, clazz.getCanonicalName()));
+      }
+    }catch(Exception ex){
+      if(ex instanceof DrillConfigurationException) throw (DrillConfigurationException) ex;
+      throw new DrillConfigurationException(String.format("Failure while initializing class [%s] described at configuration value '%s'.", className, location), ex);
+    }
+    
+  }
+  
+  public <T> T getInstanceOf(String location, Class<T> clazz) throws DrillConfigurationException{
+    Class<T> c = getClassAt(location, clazz);
+    try{
+      T t = c.newInstance();
+      return t;
+    }catch(Exception ex){
+      throw new DrillConfigurationException(String.format("Failure while instantiating class [%s] located at '%s.", clazz.getCanonicalName(), location), ex);
+    }
+  }
+  
+
+  
   public void setSinkQueues(int number, Queue<Object> queue){
     sinkQueues.set(number, queue);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/main/java/org/apache/drill/common/exceptions/DrillConfigurationException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/exceptions/DrillConfigurationException.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/exceptions/DrillConfigurationException.java
new file mode 100644
index 0000000..aa83758
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/exceptions/DrillConfigurationException.java
@@ -0,0 +1,43 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.exceptions;
+
+public class DrillConfigurationException extends DrillException {
+  public DrillConfigurationException() {
+    super();
+  }
+
+  public DrillConfigurationException(String message, Throwable cause, boolean enableSuppression,
+      boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+
+  public DrillConfigurationException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public DrillConfigurationException(String message) {
+    super(message);
+  }
+
+  public DrillConfigurationException(Throwable cause) {
+    super(cause);
+  }
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfigurationException.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/main/java/org/apache/drill/common/exceptions/DrillIOException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/exceptions/DrillIOException.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/exceptions/DrillIOException.java
new file mode 100644
index 0000000..cd7d4ab
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/exceptions/DrillIOException.java
@@ -0,0 +1,42 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.exceptions;
+
+import java.io.IOException;
+
+public class DrillIOException extends IOException{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillIOException.class);
+
+  public DrillIOException() {
+    super();
+  }
+
+  public DrillIOException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public DrillIOException(String message) {
+    super(message);
+  }
+
+  public DrillIOException(Throwable cause) {
+    super(cause);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/DataType.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/DataType.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/DataType.java
index 776a9e8..60d26dc 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/DataType.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/DataType.java
@@ -54,6 +54,8 @@ public abstract class DataType {
   public static final DataType LATEBIND = new LateBindType();
   public static final DataType BOOLEAN = new AtomType("BOOLEAN", Comparability.EQUAL, false);
   public static final DataType BYTES = new AtomType("BYTES", Comparability.ORDERED, false);
+  public static final DataType SIGNED_BYTE = new AtomType("SIGNED_BYTE", Comparability.ORDERED, true);
+  public static final DataType SIGNED_INT16 = new AtomType("SIGNED_INT16", Comparability.ORDERED, true);
   public static final DataType NVARCHAR = new AtomType("VARCHAR", Comparability.ORDERED, false);
   public static final DataType FLOAT32 = new AtomType("FLOAT32", Comparability.ORDERED, true);
   public static final DataType FLOAT64 = new AtomType("FLOAT64", Comparability.ORDERED, true);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/main/java/org/apache/drill/common/optimize/Optimizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/optimize/Optimizer.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/optimize/Optimizer.java
new file mode 100644
index 0000000..4b2037c
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/optimize/Optimizer.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.optimize;
+
+import java.io.Closeable;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillConfigurationException;
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.physical.PhysicalPlan;
+
+public abstract class Optimizer implements Closeable{
+  
+  public static String OPTIMIZER_IMPL_KEY = "drill.exec.optimizer.implementation";
+  
+  public abstract void init(DrillConfig config);
+  
+  public abstract PhysicalPlan optimize(OptimizationContext context, LogicalPlan plan);
+  public abstract void close();
+  
+  public static Optimizer getOptimizer(DrillConfig config) throws DrillConfigurationException{
+    Optimizer o = config.getInstanceOf(OPTIMIZER_IMPL_KEY, Optimizer.class);
+    o.init(config);
+    return o;
+  }
+  
+  public interface OptimizationContext{
+    public int getPriority();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/DataValidationMode.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/DataValidationMode.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/DataValidationMode.java
new file mode 100644
index 0000000..6de2cfd
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/DataValidationMode.java
@@ -0,0 +1,24 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.physical;
+
+public enum DataValidationMode {
+  TERMINATE, // terminate the query if the data doesn't match expected.
+  DROP_RECORD, // drop the record that doesn't match the expected situation.
+  SINK_RECORD // record the failed record along with the rule violation in a secondary location.
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/FieldSet.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/FieldSet.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/FieldSet.java
index 05fc49d..c76098d 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/FieldSet.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/FieldSet.java
@@ -18,7 +18,6 @@
 package org.apache.drill.common.physical;
 
 import java.io.IOException;
-import java.util.HashSet;
 import java.util.List;
 
 import org.apache.drill.common.physical.FieldSet.De;
@@ -42,19 +41,10 @@ import com.google.common.collect.Lists;
 public class FieldSet {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FieldSet.class);
   
-  private List<RecordField> incoming = Lists.newArrayList();
-  private List<RecordField> outgoing = Lists.newArrayList();
+  private List<RecordField> fields;
   
   public FieldSet(Iterable<RecordField> fields){
-    for(RecordField f : fields){
-      if(f.getRoute().isIn()){
-        incoming.add(f);
-      }
-      
-      if(f.getRoute().isOut()){
-        outgoing.add(f);
-      }
-    }
+    this.fields = Lists.newArrayList(fields);
   }
   
 
@@ -83,14 +73,7 @@ public class FieldSet {
     @Override
     public void serialize(FieldSet value, JsonGenerator jgen, SerializerProvider provider) throws IOException,
         JsonGenerationException {
-      HashSet<RecordField> fields = new HashSet<RecordField>();
-      for(RecordField f: value.incoming){
-        fields.add(f);
-      }
-      for(RecordField f: value.outgoing){
-        fields.add(f);
-      }
-      jgen.writeObject(Lists.newArrayList(fields));
+      jgen.writeObject(value.fields);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/RecordField.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/RecordField.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/RecordField.java
index 821f286..2867084 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/RecordField.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/RecordField.java
@@ -28,14 +28,14 @@ public class RecordField {
   
   private String name;
   private DataType type;
-  private Route route;
+  private ValueMode mode;
   
   @JsonCreator
-  public RecordField(@JsonProperty("name") String name, @JsonProperty("type") DataType type, @JsonProperty("route") Route route) {
+  public RecordField(@JsonProperty("name") String name, @JsonProperty("type") DataType type, @JsonProperty("mode") ValueMode mode) {
     super();
     this.name = name;
     this.type = type;
-    this.route = route;
+    this.mode = mode;
   }
 
   public String getName() {
@@ -46,34 +46,21 @@ public class RecordField {
     return type;
   }
 
-  public Route getRoute() {
-    return route;
+  public ValueMode getMode() {
+    return mode;
   }
   
+  public static enum ValueMode {
+    VECTOR,
+    DICT,
+    RLE
+  }
   
+  public static enum ValueType {
+    OPTIONAL,
+    REQUIRED, 
+    REPEATED
+  }
   
-  public static enum Route {
-    IN(true, false), 
-    OUT(false, true), 
-    THROUGH(true, true), 
-    OPAQUE(true, true);
-    
-    final boolean in;
-    final boolean out;
-    
-    Route(boolean in, boolean out){
-      this.in = in;
-      this.out = out;
-    }
-
-    public boolean isIn() {
-      return in;
-    }
-
-    public boolean isOut() {
-      return out;
-    }
-    
-  }  
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/ParsePlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/ParsePlan.java b/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/ParsePlan.java
new file mode 100644
index 0000000..e3e43aa
--- /dev/null
+++ b/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/ParsePlan.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.physical;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+public class ParsePlan {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParsePlan.class);
+  
+  
+  @Test public void parseSimplePlan() throws Exception{
+    DrillConfig c = DrillConfig.create();
+    PhysicalPlan plan = PhysicalPlan.parse(c, Files.toString(FileUtils.getResourceAsFile("/dsort_physical.json"), Charsets.UTF_8));
+    System.out.println(plan.unparse(c));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/test/resources/basic_physical.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/resources/basic_physical.json b/sandbox/prototype/common/src/test/resources/basic_physical.json
new file mode 100644
index 0000000..4d1d329
--- /dev/null
+++ b/sandbox/prototype/common/src/test/resources/basic_physical.json
@@ -0,0 +1,42 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+    storage:{
+        fs1:{
+            type:"mock"
+        }
+    },
+    graph:[
+        {
+            @id:1,
+            pop:"scan",
+            storageengine:"fs1",
+            entries:[
+            	{"test1"}
+           	],
+            output:[
+                { "name":"key", mode: "VECTOR", type:"SINT32"},
+                { "name":"value", mode: "VECTOR", type:"SINT32"}
+            ]
+        },
+        {
+            @id:2,
+            child:1,
+            pop: "store",
+            mode: "SYSTEM_CHOICE",
+            storageengine: "fs1",
+            entries:[
+                {
+                    path:"/sort/sorted/${partition_number}.seq",
+                    key:"Text",
+                    type:"JAVA_SEQUENCE"
+                }
+            ] 
+        }           
+    ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/test/resources/dsort-physical.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/resources/dsort-physical.json b/sandbox/prototype/common/src/test/resources/dsort-physical.json
deleted file mode 100644
index 3c57a0a..0000000
--- a/sandbox/prototype/common/src/test/resources/dsort-physical.json
+++ /dev/null
@@ -1,76 +0,0 @@
-{
-    head:{
-        type:"APACHE_DRILL_PHYSICAL",
-        version:"1",
-        generator:{
-            type:"manual"
-        }
-    },
-    storage:{
-        fs1:{
-            type:"mock"
-        }
-    },
-    graph:[
-        {
-            @id:1,
-            pop:"scan",
-            storageengine:"fs1",
-            entries:[{}],
-            fields:[
-                { "name":"key", route: "OUT", type:"LATE"},
-                { "name":"value", route: "OUT", type:"LATE"}
-            ]
-        },
-        {
-            @id:2,
-            child: 1,
-            pop:"quicknwaysort",
-            orderings:[
-                {
-                    order: "DESC",
-                    expr: "data.key"
-                }
-            ],
-            fields:[
-                { "name":"key", route: "THROUGH", type:"LATE"},
-                { "name":"value", route: "OPAQUE", type:"LATE"}
-            ]
-
-        },
-        {
-            @id:3,
-            child: 2,
-            pop:"exchange",
-            partition:{
-                mode:"RANGE",
-                exprs:["key"]
-            },
-            stitch:{
-                mode:"RANDOM"
-            },
-            fields:[
-                { "name":"key", route: "THROUGH", type:"LATE"},
-                { "name":"value", route: "OPAQUE", type:"LATE"}
-            ]
-        },
-        {
-            @id:4,
-            child:3,
-            pop: "store",
-            mode: "SYSTEM_CHOICE",
-            storageengine: "fs1",
-            entries:[
-                {
-                    path:"/sort/sorted/${partition_number}.seq",
-                    key:"Text",
-                    type:"JAVA_SEQUENCE"
-                }
-            ],
-            fields:[
-                { "name":"key", route: "IN", type:"LATE"},
-                { "name":"value", route: "IN", type:"LATE"}
-            ] 
-        }           
-    ]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/test/resources/dsort_logical.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/resources/dsort_logical.json b/sandbox/prototype/common/src/test/resources/dsort_logical.json
new file mode 100644
index 0000000..83d30e8
--- /dev/null
+++ b/sandbox/prototype/common/src/test/resources/dsort_logical.json
@@ -0,0 +1,40 @@
+{
+  head:{ type:"apache_drill_logical_plan", version:"1", generator:{ type:"manual", info:"na"}}},
+  storage:[ { type:"fs", name:"fs1", root:"file:///" }],
+  query:[ { op: "sequence", sequence: [
+    {
+      op:"scan",
+      storageengine:"fs1",
+      ref: "data",
+      selection: {
+        path: "/sort/unsorted/*.seq",
+        type: "JAVA_SEQUENCE"
+      }
+    },
+    {
+      op: "order",
+      orderings: [
+        {order: "desc", expr: "data.key" }
+      ]
+    }, 
+    {
+      op: "project",
+      projections: [
+        { ref: "output.key", expr: "data.key" },
+        { ref: "output.value", expr: "data.value" }
+      ]
+    },
+	{
+      op: "store",
+      storageengine: "fs1",
+      target: {
+        path: "/sort/sorted/${partition}.seq",
+        type: "JAVA_SEQUENCE",
+        partition: {
+          type: "ORDERED",
+          exprs: ["key"]
+        }
+      }
+    }
+  ]}]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/test/resources/dsort_physical.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/resources/dsort_physical.json b/sandbox/prototype/common/src/test/resources/dsort_physical.json
new file mode 100644
index 0000000..7c31df2
--- /dev/null
+++ b/sandbox/prototype/common/src/test/resources/dsort_physical.json
@@ -0,0 +1,72 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+    storage:{
+        fs1:{
+            type:"mock"
+        }
+    },
+    graph:[
+        {
+            @id:1,
+            pop:"scan",
+            storageengine:"fs1",
+            entries:[{}],
+            output:[
+                { "name":"key", mode: "VECTOR", type:"LATE"},
+                { "name":"value", mode: "VECTOR", type:"LATE"}
+            ]
+        },
+        {
+            @id:2,
+            child: 1,
+            pop:"quicknwaysort",
+            orderings:[
+                {
+                    order: "DESC",
+                    expr: "data.key"
+                }
+            ],
+            output:[
+                { "name":"key", mode: "VECTOR", type:"LATE"},
+                { "name":"value", mode: "VECTOR", type:"LATE"}
+            ]
+
+        },
+        {
+            @id:3,
+            child: 2,
+            pop:"exchange",
+            partition:{
+                mode:"RANGE",
+                exprs:["key"]
+            },
+            stitch:{
+                mode:"RANDOM"
+            },
+            fields:[
+                { "name":"key" mode: "VECTOR", type:"LATE"},
+                { "name":"value" mode: "VECTOR", type:"LATE"}
+            ]
+        },
+        {
+            @id:4,
+            child:3,
+            pop: "store",
+            mode: "SYSTEM_CHOICE",
+            storageengine: "fs1",
+            entries:[
+                {
+                    path:"/sort/sorted/${partition_number}.seq",
+                    key:"Text",
+                    type:"JAVA_SEQUENCE"
+                }
+            ] 
+        }           
+    ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/test/resources/simple_plan.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/resources/simple_plan.json b/sandbox/prototype/common/src/test/resources/simple_plan.json
new file mode 100644
index 0000000..2457b1f
--- /dev/null
+++ b/sandbox/prototype/common/src/test/resources/simple_plan.json
@@ -0,0 +1,133 @@
+{
+  head:{
+    type:"apache_drill_logical_plan",
+    version:"1",
+    generator:{
+      type:"manual",
+      info:"na"
+    }
+  },
+  storage:{
+    logs: {
+      type:"text",
+	  file: "local://logs/*.log",
+	  compress:"gzip",
+	  line-delimiter:"\n",
+	  record-maker:{
+	    type:"first-row",
+	    delimiter:","
+	  }
+    },
+    {
+      type:"mongo",
+      name:"users",
+      connection:"mongodb://blue:red@localhost/users"
+    },
+    {
+      type:"mysql",
+      name:"mysql",
+      connection:"jdbc:mysql://localhost/main"
+    }
+  ],
+  query:[
+    {
+      @id:"1",
+      op:"scan",
+      memo:"initial_scan",
+      storageengine:"local-logs",
+      selection: {}
+    },
+    {
+      @id:"2",
+      input:"1",
+      memo:"transform1",
+      op:"transform",
+      transforms:[
+        {
+          ref:"userId",
+          expr:"regex_like('activity.cookie', \"persistent=([^;]*)\")"
+        },
+        {
+          ref:"session",
+          expr:"regex_like('activity.cookie', \"session=([^;]*)\")"
+        }
+      ]
+    },
+    {
+      @id:"3",
+      input:"2",
+      memo:"transform2",
+      op:"transform",
+      transforms:[
+        {
+          ref:"userId",
+          expr:"regex_like('activity.cookie', \"persistent=([^;]*)\")"
+        },
+        {
+          ref:"session",
+          expr:"regex_like('activity.cookie', \"session=([^;]*)\")"
+        }
+      ]
+    },
+    {
+      @id:"7",
+      input:"3",
+      op:"sequence",
+      do:[
+        {
+          op:"transform",
+          memo:"seq_transform",
+          transforms:[
+            {
+              ref:"happy",
+              expr:"regex_like('ep2', \"dink\")"
+            }
+          ]
+        }
+        ,
+        {
+          op:"transform",
+          memo:"last_transform",
+          transforms:[
+            {
+              ref:"abc",
+              expr:"123"
+            }
+          ]
+        }
+      ]
+    },
+    {
+      @id:"10",
+      input:"3",
+      op:"transform",
+      memo:"t3",
+      transforms:[
+        {
+          ref:"happy",
+          expr:"regex_like('ep2', \"dink\")"
+        }
+      ]
+    },
+    {
+      @id:12,
+      op:"join",
+      type: "inner",
+      left:"7",
+      right:"10",
+      conditions: [{relationship:"==", left: "1", right: "1" }]
+    }
+    ,
+    {
+      input: 12,
+      op: "store",
+      memo: "output sink",
+      target: {
+        file: "console:///stdout"
+      }
+      
+    }
+
+    
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/pom.xml b/sandbox/prototype/exec/java-exec/pom.xml
index a458160..663bab4 100644
--- a/sandbox/prototype/exec/java-exec/pom.xml
+++ b/sandbox/prototype/exec/java-exec/pom.xml
@@ -1,15 +1,154 @@
 <?xml version="1.0"?>
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <artifactId>exec-parent</artifactId>
-    <groupId>org.apache.drill.exec</groupId>
-    <version>1.0-SNAPSHOT</version>
-  </parent>
-  <artifactId>java-exec</artifactId>
-  <name>java-exec</name>
-
-  <dependencies>
-  </dependencies>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<artifactId>exec-parent</artifactId>
+		<groupId>org.apache.drill.exec</groupId>
+		<version>1.0-SNAPSHOT</version>
+	</parent>
+	<artifactId>java-exec</artifactId>
+	<name>java-exec</name>
+
+	<properties>
+		<target.gen.source.path>${project.basedir}/target/generated-sources</target.gen.source.path>
+		<proto.cas.path>${project.basedir}/src/main/protobuf/</proto.cas.path>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>com.twitter</groupId>
+			<artifactId>parquet-column</artifactId>
+			<version>1.0.0-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.drill</groupId>
+			<artifactId>common</artifactId>
+			<version>1.0-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>com.beust</groupId>
+			<artifactId>jcommander</artifactId>
+			<version>1.30</version>
+		</dependency>
+		<dependency>
+			<groupId>com.netflix.curator</groupId>
+			<artifactId>curator-x-discovery</artifactId>
+			<version>1.1.9</version>
+			<exclusions>
+				<!-- <exclusion> -->
+				<!-- <artifactId>netty</artifactId> -->
+				<!-- <groupId>org.jboss.netty</groupId> -->
+				<!-- </exclusion> -->
+				<exclusion>
+					<artifactId>slf4j-log4j12</artifactId>
+					<groupId>org.slf4j</groupId>
+				</exclusion>
+				<exclusion>
+					<artifactId>log4j</artifactId>
+					<groupId>log4j</groupId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.xerial.snappy</groupId>
+			<artifactId>snappy-java</artifactId>
+			<version>1.0.5-M3</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-core</artifactId>
+			<version>1.1.0</version>
+			<exclusions>
+				<exclusion>
+					<artifactId>jets3t</artifactId>
+					<groupId>net.java.dev.jets3t</groupId>
+				</exclusion>
+				<exclusion>
+					<artifactId>commons-logging</artifactId>
+					<groupId>commons-logging</groupId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>com.carrotsearch</groupId>
+			<artifactId>hppc</artifactId>
+			<version>0.4.2</version>
+		</dependency>
+		<dependency>
+			<groupId>io.netty</groupId>
+			<artifactId>netty-all</artifactId>
+			<version>4.0.0.CR1</version>
+		</dependency>
+		<dependency>
+			<groupId>com.google.protobuf</groupId>
+			<artifactId>protobuf-java</artifactId>
+			<version>2.5.0</version>
+		</dependency>
+		<dependency>
+			<groupId>com.hazelcast</groupId>
+			<artifactId>hazelcast</artifactId>
+			<version>2.5</version>
+		</dependency>		
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>native-maven-plugin</artifactId>
+				<version>1.0-alpha-7</version>
+				<configuration>
+					<javahClassNames>
+						<javahClassName>org.apache.drill.exec.mem.ByteBufferAllocator</javahClassName>
+					</javahClassNames>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-antrun-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>generate-sources</id>
+						<phase>generate-sources</phase>
+						<configuration>
+							<tasks>
+								<mkdir dir="${target.gen.source.path}" />
+								<path id="proto.path.files">
+									<fileset dir="${proto.cas.path}">
+										<include name="*.proto" />
+									</fileset>
+								</path>
+								<pathconvert pathsep=" " property="proto.files"
+									refid="proto.path.files" />
+
+								<exec executable="protoc">
+									<arg value="--java_out=${target.gen.source.path}" />
+									<arg value="--proto_path=${proto.cas.path}" />
+									<arg line="${proto.files}" />
+								</exec>
+							</tasks>
+							<sourceRoot>${target.gen.source.path}</sourceRoot>
+						</configuration>
+						<goals>
+							<goal>run</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<!-- <plugin> -->
+			<!-- <groupId>com.github.igor-petruk.protobuf</groupId> -->
+			<!-- <artifactId>protobuf-maven-plugin</artifactId> -->
+			<!-- <version>0.6.2</version> -->
+			<!-- <executions> -->
+			<!-- <execution> -->
+			<!-- <goals> -->
+			<!-- <goal>run</goal> -->
+			<!-- </goals> -->
+			<!-- </execution> -->
+			<!-- </executions> -->
+			<!-- </plugin> -->
+		</plugins>
+	</build>
+
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/ClasspathRSE.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/ClasspathRSE.java b/sandbox/prototype/exec/java-exec/rse/ClasspathRSE.java
new file mode 100644
index 0000000..aa8186d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/ClasspathRSE.java
@@ -0,0 +1,88 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StorageEngineConfigBase;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.ref.exceptions.SetupException;
+import org.apache.drill.exec.ref.rops.DataWriter.ConverterType;
+import org.apache.drill.exec.ref.rops.ROP;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+public class ClasspathRSE extends RSEBase {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClasspathRSE.class);
+
+  private DrillConfig dConfig;
+  private SchemaPath rootPath;
+  
+  public ClasspathRSE(ClasspathRSEConfig engineConfig, DrillConfig dConfig) throws SetupException{
+    this.dConfig = dConfig;
+  }
+
+  
+  @JsonTypeName("classpath")
+  public static class ClasspathRSEConfig extends StorageEngineConfigBase {
+  }
+  
+  public static class ClasspathInputConfig implements ReadEntry{
+    public String path;
+    public ConverterType type;
+    @JsonIgnore public SchemaPath rootPath; 
+  }
+
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException {
+    ClasspathInputConfig c = scan.getSelection().getWith(dConfig, ClasspathInputConfig.class);
+    c.rootPath = scan.getOutputReference();
+    return Collections.singleton((ReadEntry) c);
+  }
+
+  @Override
+  public RecordReader getReader(ReadEntry readEntry, ROP parentROP) throws IOException {
+    ClasspathInputConfig e = getReadEntry(ClasspathInputConfig.class, readEntry);
+    URL u = RecordReader.class.getResource(e.path);
+    if(u == null){
+      throw new IOException(String.format("Failure finding classpath resource %s.", e.path));
+    }
+    switch(e.type){
+    case JSON:
+      return new JSONRecordReader(e.rootPath, dConfig, u.openStream(), parentROP);
+    default:
+      throw new UnsupportedOperationException();
+    }
+  }
+  
+  
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/ConsoleRSE.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/ConsoleRSE.java b/sandbox/prototype/exec/java-exec/rse/ConsoleRSE.java
new file mode 100644
index 0000000..1570ea9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/ConsoleRSE.java
@@ -0,0 +1,60 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.OutputStream;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.StorageEngineConfigBase;
+import org.apache.drill.common.logical.data.Store;
+import org.apache.drill.exec.ref.rops.DataWriter.ConverterType;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+public class ConsoleRSE extends RSEBase {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConsoleRSE.class);
+  
+  private final DrillConfig dConfig;
+  
+  public static enum Pipe {
+    STD_OUT, STD_ERR
+  };
+
+  public ConsoleRSE(ConsoleRSEConfig engineConfig, DrillConfig dConfig){
+    this.dConfig = dConfig;
+  }
+  
+  public static class ConsoleOutputConfig {
+    public Pipe pipe = Pipe.STD_OUT;
+    public ConverterType type = ConverterType.JSON;
+  }
+  
+  @JsonTypeName("console") public static class ConsoleRSEConfig extends StorageEngineConfigBase {}
+  
+  public boolean supportsWrite() {
+    return true;
+  }
+
+  @Override
+  public RecordRecorder getWriter(Store store) {
+    ConsoleOutputConfig config = store.getTarget().getWith(dConfig, ConsoleOutputConfig.class);
+    OutputStream out = config.pipe == Pipe.STD_OUT ? System.out : System.err;
+    return new OutputStreamWriter(out, config.type, false);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/FileSystemRSE.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/FileSystemRSE.java b/sandbox/prototype/exec/java-exec/rse/FileSystemRSE.java
new file mode 100644
index 0000000..522191b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/FileSystemRSE.java
@@ -0,0 +1,144 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StorageEngineConfigBase;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.common.logical.data.Store;
+import org.apache.drill.exec.ref.exceptions.SetupException;
+import org.apache.drill.exec.ref.rops.DataWriter.ConverterType;
+import org.apache.drill.exec.ref.rops.ROP;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+public class FileSystemRSE extends RSEBase {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemRSE.class);
+
+  private FileSystem fs;
+  private Path basePath;
+  private DrillConfig dConfig;
+
+  public FileSystemRSE(FileSystemRSEConfig engineConfig, DrillConfig dConfig) throws SetupException{
+    this.dConfig = dConfig;
+    
+    try {
+      URI u = new URI(engineConfig.root);
+      String path = u.getPath();
+      
+      if(path.charAt(path.length()-1) != '/') throw new SetupException(String.format("The file root provided of %s included a file '%s'.  This must be a base path.", engineConfig.root, u.getPath()));
+      fs = FileSystem.get(u, new Configuration());
+      basePath = new Path(u.getPath());
+    } catch (URISyntaxException | IOException e) {
+      throw new SetupException("Failure while reading setting up file system root path.", e);
+    }
+  }
+
+  
+  @JsonTypeName("fs")
+  public static class FileSystemRSEConfig extends StorageEngineConfigBase {
+    private String root;
+
+    @JsonCreator
+    public FileSystemRSEConfig(@JsonProperty("root") String root) {
+      this.root = root;
+    }
+  }
+  
+  public static class FileSystemInputConfig {
+    public FileSpec[] files;
+  }
+  
+  public static class FileSpec{
+    public String path;
+    public ConverterType type;
+  }
+  
+  
+  public class FSEntry implements ReadEntry{
+    Path path;
+    ConverterType type;
+    SchemaPath rootPath;
+
+    public FSEntry(FileSpec spec, SchemaPath rootPath){
+      this.path = new Path(basePath, spec.path);
+      this.type = spec.type;
+      this.rootPath = rootPath;
+    }
+        
+  }
+
+  public class FileSystemOutputConfig {
+    public String file;
+    public ConverterType type;
+  }
+
+  public boolean supportsRead() {
+    return true;
+  }
+  
+  public boolean supportsWrite() {
+    return true;
+  }
+
+  @Override
+  public RecordRecorder getWriter(Store store) throws IOException {
+    FileSystemOutputConfig config = store.getTarget().getWith(dConfig, FileSystemOutputConfig.class);
+    OutputStream out = fs.create(new Path(basePath, config.file));
+    return new OutputStreamWriter(out, config.type, true);
+  }
+
+  @Override
+  public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException {
+    Set<ReadEntry> s = new HashSet<ReadEntry>();
+    for(FileSpec f : scan.getSelection().getWith(dConfig, FileSystemInputConfig.class).files){
+      s.add(new FSEntry(f, scan.getOutputReference()));
+    }
+    return s;
+  }
+
+  @Override
+  public RecordReader getReader(ReadEntry readEntry, ROP parentROP) throws IOException {
+    FSEntry e = getReadEntry(FSEntry.class, readEntry);
+    
+    switch(e.type){
+    case JSON:
+      return new JSONRecordReader(e.rootPath, dConfig, fs.open(e.path), parentROP);
+    default:
+      throw new UnsupportedOperationException();
+    }
+  }
+  
+  
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/JSONDataWriter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/JSONDataWriter.java b/sandbox/prototype/exec/java-exec/rse/JSONDataWriter.java
new file mode 100644
index 0000000..24434d5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/JSONDataWriter.java
@@ -0,0 +1,142 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.drill.exec.ref.rops.DataWriter;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+
+public class JSONDataWriter implements DataWriter{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONDataWriter.class);
+  
+  private final JsonGenerator g;
+//  private CharSequence transientName;
+  
+  public JSONDataWriter(OutputStream out) throws IOException{
+    JsonFactory f = new JsonFactory();
+    
+    this.g = f.createJsonGenerator(out, JsonEncoding.UTF8);
+    this.g.useDefaultPrettyPrinter();
+  }
+  
+  private String s(CharSequence seq) {
+    String s = (seq instanceof String) ? (String) seq : seq.toString();
+    return s;
+  }
+  
+  @Override
+  public void startRecord() throws IOException {
+    
+  }
+
+  @Override
+  public void writeArrayStart(int length) throws IOException {
+    g.writeStartArray();
+  }
+
+  @Override
+  public void writeArrayElementStart() throws IOException {
+  }
+
+  @Override
+  public void writeArrayElementEnd() throws IOException {
+  }
+
+  @Override
+  public void writeArrayEnd() throws IOException {
+    g.writeEndArray();
+  }
+
+  @Override
+  public void writeMapStart() throws IOException {
+    g.writeStartObject();
+  }
+
+  @Override
+  public void writeMapKey(CharSequence seq) throws IOException {
+    g.writeFieldName(s(seq));
+  }
+
+  @Override
+  public void writeMapValueStart() throws IOException {
+  }
+
+  @Override
+  public void writeMapValueEnd() throws IOException {
+  }
+
+  @Override
+  public void writeMapEnd() throws IOException {
+    g.writeEndObject();
+  }
+
+  @Override
+  public void writeBoolean(boolean b) throws IOException {
+    g.writeBoolean(b);
+  }
+
+  @Override
+  public void writeSInt32(int value) throws IOException {
+    g.writeNumber(value);
+  }
+
+  @Override
+  public void writeSInt64(long value) throws IOException {
+    g.writeNumber(value);
+  }
+
+  @Override
+  public void writeBytes(byte[] bytes) throws IOException {
+    g.writeBinary(bytes);
+  }
+
+  @Override
+  public void writeSFloat64(double value) throws IOException {
+    g.writeNumber(value);
+  }
+
+  @Override
+  public void writeSFloat32(float value) throws IOException {
+    g.writeNumber(value);
+  }
+
+  @Override
+  public void writeNullValue() throws IOException {
+    g.writeNull();
+  }
+
+  @Override
+  public void writeCharSequence(CharSequence value) throws IOException {
+    g.writeString(s(value));
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    g.writeRawValue("\n");
+  }
+  
+  public void finish() throws IOException{
+    g.close();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/rse/JSONRecordReader.java
new file mode 100644
index 0000000..7510e72
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/JSONRecordReader.java
@@ -0,0 +1,183 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.RunOutcome;
+import org.apache.drill.exec.ref.UnbackedRecord;
+import org.apache.drill.exec.ref.exceptions.RecordException;
+import org.apache.drill.exec.ref.rops.ROP;
+import org.apache.drill.exec.ref.values.DataValue;
+import org.apache.drill.exec.ref.values.ScalarValues.BooleanScalar;
+import org.apache.drill.exec.ref.values.ScalarValues.BytesScalar;
+import org.apache.drill.exec.ref.values.ScalarValues.DoubleScalar;
+import org.apache.drill.exec.ref.values.ScalarValues.IntegerScalar;
+import org.apache.drill.exec.ref.values.ScalarValues.LongScalar;
+import org.apache.drill.exec.ref.values.ScalarValues.StringScalar;
+import org.apache.drill.exec.ref.values.SimpleArrayValue;
+import org.apache.drill.exec.ref.values.SimpleMapValue;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Charsets;
+
+public class JSONRecordReader implements RecordReader {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
+
+  private InputStreamReader input;
+  private String file;
+  private SchemaPath rootPath;
+  private JsonParser parser;
+  private UnbackedRecord record = new UnbackedRecord();
+  private ObjectMapper mapper;
+  private ROP parent;
+
+  public JSONRecordReader(SchemaPath rootPath, DrillConfig dConfig, InputStream stream, ROP parent) throws IOException {
+    this.input = new InputStreamReader(stream, Charsets.UTF_8);
+    this.mapper = dConfig.getMapper();
+    this.parser = mapper.getFactory().createJsonParser(input);
+    this.parent = parent;
+    this.rootPath = rootPath;
+  }
+
+  private class NodeIter implements RecordIterator {
+
+    @Override
+    public NextOutcome next() {
+//      logger.debug("Next Record Called");
+      try {
+        if (parser.nextToken() == null) {
+//          logger.debug("No current token, returning.");
+          return NextOutcome.NONE_LEFT;
+        }
+        JsonNode n = mapper.readTree(parser);
+        if (n == null) {
+//          logger.debug("Nothing was returned for read tree, returning.");
+          return NextOutcome.NONE_LEFT;
+        }
+//        logger.debug("Record found, returning new json record.");
+        record.setClearAndSetRoot(rootPath, convert(n));
+        // todo, add schema checking here.
+        return NextOutcome.INCREMENTED_SCHEMA_CHANGED;
+      } catch (IOException e) {
+        throw new RecordException("Failure while reading record", null, e);
+      }
+    }
+
+
+    @Override
+    public RecordPointer getRecordPointer() {
+      return record;
+    }
+
+
+    @Override
+    public ROP getParent() {
+      return parent;
+    }
+
+  }
+
+  private DataValue convert(JsonNode node) {
+    if (node == null || node.isNull() || node.isMissingNode()) {
+      return DataValue.NULL_VALUE;
+    } else if (node.isArray()) {
+      SimpleArrayValue arr = new SimpleArrayValue(node.size());
+      for (int i = 0; i < node.size(); i++) {
+        arr.addToArray(i, convert(node.get(i)));
+      }
+      return arr;
+    } else if (node.isObject()) {
+      SimpleMapValue map = new SimpleMapValue();
+      String name;
+      for (Iterator<String> iter = node.fieldNames(); iter.hasNext();) {
+        name = iter.next();
+        map.setByName(name, convert(node.get(name)));
+      }
+      return map;
+    } else if (node.isBinary()) {
+      try {
+        return new BytesScalar(node.binaryValue());
+      } catch (IOException e) {
+        throw new RuntimeException("Failure converting binary value.", e);
+      }
+    } else if (node.isBigDecimal()) {
+      throw new UnsupportedOperationException();
+//      return new BigDecimalScalar(node.decimalValue());
+    } else if (node.isBigInteger()) {
+      throw new UnsupportedOperationException();
+//      return new BigIntegerScalar(node.bigIntegerValue());
+    } else if (node.isBoolean()) {
+      return new BooleanScalar(node.asBoolean());
+    } else if (node.isFloatingPointNumber()) {
+      if (node.isBigDecimal()) {
+        throw new UnsupportedOperationException();
+//        return new BigDecimalScalar(node.decimalValue());
+      } else {
+        return new DoubleScalar(node.asDouble());
+      }
+    } else if (node.isInt()) {
+      return new IntegerScalar(node.asInt());
+    } else if (node.isLong()) {
+      return new LongScalar(node.asLong());
+    } else if (node.isTextual()) {
+      return new StringScalar(node.asText());
+    } else {
+      throw new UnsupportedOperationException(String.format("Don't know how to convert value of type %s.", node
+          .getClass().getCanonicalName()));
+    }
+
+  }
+
+  
+  /* (non-Javadoc)
+   * @see org.apache.drill.exec.ref.rse.DataReader#getIterator()
+   */
+  @Override
+  public RecordIterator getIterator() {
+    return new NodeIter();
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.drill.exec.ref.rse.DataReader#cleanup()
+   */
+  @Override
+  public void cleanup() {
+    try {
+      parser.close();
+      this.input.close();
+    } catch (IOException e) {
+      logger.warn("Error while closing InputStream for file {}", file, e);
+    }
+
+  }
+
+
+  @Override
+  public void setup() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/OutputStreamWriter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/OutputStreamWriter.java b/sandbox/prototype/exec/java-exec/rse/OutputStreamWriter.java
new file mode 100644
index 0000000..20d5b8f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/OutputStreamWriter.java
@@ -0,0 +1,78 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.RunOutcome.OutcomeType;
+import org.apache.drill.exec.ref.rops.DataWriter;
+import org.apache.drill.exec.ref.rops.DataWriter.ConverterType;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+public class OutputStreamWriter implements RecordRecorder{
+  
+  private OutputStream stream;
+  private FSDataOutputStream posStream;
+  private DataWriter writer;
+  private ConverterType type;
+  private boolean closeStream;
+  
+  public OutputStreamWriter(OutputStream stream, ConverterType type, boolean closeStream){
+    this.stream = stream;
+    this.closeStream = closeStream;
+    this.type = type;
+    if(stream instanceof FSDataOutputStream) posStream = (FSDataOutputStream) stream;
+  }
+
+  @Override
+  public void setup() throws IOException {
+    DataWriter w = null;
+    switch(type){
+    case JSON:
+      w = new JSONDataWriter(stream);
+      break;
+    default:
+      throw new UnsupportedOperationException();
+    }
+    this.writer = w;
+  }
+  
+  private long getPos() throws IOException{
+    if(posStream == null) return 0;
+    return posStream.getPos();
+  }
+
+  @Override
+  public long recordRecord(RecordPointer pointer) throws IOException {
+    pointer.write(writer);
+    return getPos();
+  }
+
+  @Override
+  public void finish(OutcomeType outcome) throws IOException {
+    writer.finish();
+    if(closeStream){
+      stream.close();
+    }else{
+      stream.flush();
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/QueueRSE.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/QueueRSE.java b/sandbox/prototype/exec/java-exec/rse/QueueRSE.java
new file mode 100644
index 0000000..9a0a132
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/QueueRSE.java
@@ -0,0 +1,100 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.StorageEngineConfigBase;
+import org.apache.drill.common.logical.data.Store;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.RunOutcome.OutcomeType;
+import org.apache.drill.exec.ref.exceptions.SetupException;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+public class QueueRSE extends RSEBase {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueueRSE.class);
+
+  private DrillConfig dConfig;
+  private final List<Queue<Object>> sinkQueues;
+  
+  public QueueRSE(QueueRSEConfig engineConfig, DrillConfig dConfig) throws SetupException{
+    this.dConfig = dConfig;
+    sinkQueues = Collections.singletonList( (Queue<Object>) (new ArrayBlockingQueue<Object>(100)));
+  }
+
+  public Queue<Object> getQueue(int number){
+    return sinkQueues.get(number);
+  }
+  
+  @JsonTypeName("queue") public static class QueueRSEConfig extends StorageEngineConfigBase {}
+  
+  public static class QueueOutputInfo{
+    public int number;
+  }
+
+  public boolean supportsWrite() {
+    return true;
+  }
+
+  
+  @Override
+  public RecordRecorder getWriter(Store store) throws IOException {
+    QueueOutputInfo config = store.getTarget().getWith(dConfig, QueueOutputInfo.class);
+    Queue<Object> q = dConfig.getQueue(config.number);
+    return new QueueRecordRecorder(q);
+  }
+
+  
+  private class QueueRecordRecorder implements RecordRecorder{
+
+    private final Queue<Object> queue;
+    
+    public QueueRecordRecorder(Queue<Object> queue) {
+      this.queue = queue;
+    }
+
+    @Override
+    public void setup() throws IOException {
+    }
+
+    @Override
+    public long recordRecord(RecordPointer r) throws IOException {
+      final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      final JSONDataWriter writer = new JSONDataWriter(baos);
+      r.write(writer);
+      writer.finish();
+      queue.add(baos.toByteArray());
+      return 0;
+    }
+
+    @Override
+    public void finish(OutcomeType type) throws IOException {
+      queue.add(type);
+    }
+    
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/RSEBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/RSEBase.java b/sandbox/prototype/exec/java-exec/rse/RSEBase.java
new file mode 100644
index 0000000..3f86c98
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/RSEBase.java
@@ -0,0 +1,71 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.common.logical.data.Store;
+import org.apache.drill.common.util.PathScanner;
+import org.apache.drill.exec.ref.ExecRefConstants;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.exceptions.MajorException;
+import org.apache.drill.exec.ref.rops.ROP;
+
+import com.typesafe.config.Config;
+
+public abstract class RSEBase implements ReferenceStorageEngine{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RSEBase.class);
+  
+  @Override
+  public boolean supportsRead() {
+    return false;
+  }
+
+  @Override
+  public boolean supportsWrite() {
+    return false;
+  }
+
+  @Override
+  public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException {
+    throw new UnsupportedOperationException(String.format("%s does not support reads.", this.getClass().getCanonicalName()));
+  }
+
+  @Override
+  public RecordReader getReader(ReadEntry readEntry, ROP parentROP) throws IOException {
+    throw new UnsupportedOperationException(String.format("%s does not support reads.", this.getClass().getCanonicalName()));
+  }
+
+  @Override
+  public RecordRecorder getWriter(Store store) throws IOException {
+    throw new UnsupportedOperationException(String.format("%s does not support writes.", this.getClass().getCanonicalName()));
+  }
+  
+  public static Class<?>[] getSubTypes(Config config){
+    Collection<Class<? extends ReferenceStorageEngine>> engines = PathScanner.scanForImplementations(ReferenceStorageEngine.class, config.getStringList(ExecRefConstants.STORAGE_ENGINE_SCAN_PACKAGES));
+    return engines.toArray(new Class<?>[engines.size()]);
+  }
+
+  @SuppressWarnings("unchecked")
+  protected <T extends ReadEntry> T getReadEntry(Class<T> c, ReadEntry entry){
+    if(!c.isAssignableFrom(entry.getClass())) throw new MajorException(String.format("Expected entry type was invalid.  Expected entry of type %s but received type of %s.", c.getCanonicalName(), entry.getClass().getCanonicalName()));
+    return (T) entry;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/RSERegistry.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/RSERegistry.java b/sandbox/prototype/exec/java-exec/rse/RSERegistry.java
new file mode 100644
index 0000000..4266aac
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/RSERegistry.java
@@ -0,0 +1,85 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.common.util.PathScanner;
+import org.apache.drill.exec.ref.ExecRefConstants;
+import org.apache.drill.exec.ref.exceptions.SetupException;
+
+import com.typesafe.config.Config;
+
+public class RSERegistry {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RSERegistry.class);
+  
+  private Map<Object, Constructor<? extends ReferenceStorageEngine>> availableEngines = new HashMap<Object, Constructor<? extends ReferenceStorageEngine>>();
+  private Map<StorageEngineConfig, ReferenceStorageEngine> activeEngines = new HashMap<StorageEngineConfig, ReferenceStorageEngine>();
+  private DrillConfig config;
+  
+  public RSERegistry(DrillConfig config){
+    this.config = config;
+    setup(config);
+  }
+  
+  @SuppressWarnings("unchecked")
+  public void setup(DrillConfig config){
+    Collection<Class<? extends ReferenceStorageEngine>> engines = PathScanner.scanForImplementations(ReferenceStorageEngine.class, config.getStringList(ExecRefConstants.STORAGE_ENGINE_SCAN_PACKAGES));
+    logger.debug("Loading storage engines {}", engines);
+    for(Class<? extends ReferenceStorageEngine> engine: engines){
+      int i =0;
+      for(Constructor<?> c : engine.getConstructors()){
+        Class<?>[] params = c.getParameterTypes();
+        if(params.length != 2 || params[1] == Config.class || !StorageEngineConfig.class.isAssignableFrom(params[0])){
+          logger.debug("Skipping ReferenceStorageEngine constructor {} for engine class {} since it doesn't implement a [constructor(StorageEngineConfig, Config)]", c, engine);
+          continue;
+        }
+        availableEngines.put(params[0], (Constructor<? extends ReferenceStorageEngine>) c);
+        i++;
+      }
+      if(i == 0){
+        logger.debug("Skipping registration of ReferenceStorageEngine {} as it doesn't have a constructor with the parameters of (StorangeEngineConfig, Config)", engine.getCanonicalName());
+      }
+    }
+  }
+  
+  public ReferenceStorageEngine getEngine(StorageEngineConfig engineConfig) throws SetupException{
+    ReferenceStorageEngine engine = activeEngines.get(engineConfig);
+    if(engine != null) return engine;
+    Constructor<? extends ReferenceStorageEngine> c = availableEngines.get(engineConfig.getClass());
+    if(c == null) throw new SetupException(String.format("Failure finding StorageEngine constructor for config %s", engineConfig));
+    try {
+      return c.newInstance(engineConfig, config);
+    } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+      Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException)e).getTargetException() : e;
+      if(t instanceof SetupException) throw ((SetupException) t);
+      throw new SetupException(String.format("Failure setting up new storage engine configuration for config %s", engineConfig), t);
+    }
+  }
+  
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/RecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/RecordReader.java b/sandbox/prototype/exec/java-exec/rse/RecordReader.java
new file mode 100644
index 0000000..b7840bc
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/RecordReader.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import org.apache.drill.exec.ref.RecordIterator;
+
+public interface RecordReader {
+
+  public abstract RecordIterator getIterator();
+  public abstract void setup();
+  public abstract void cleanup();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/RecordRecorder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/RecordRecorder.java b/sandbox/prototype/exec/java-exec/rse/RecordRecorder.java
new file mode 100644
index 0000000..9527b0b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/RecordRecorder.java
@@ -0,0 +1,32 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.RunOutcome;
+
+public interface RecordRecorder {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordRecorder.class);
+  
+  public void setup() throws IOException;
+  public long recordRecord(RecordPointer pointer) throws IOException;
+  public void finish(RunOutcome.OutcomeType outcome) throws IOException;
+  
+}