You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/02 05:03:12 UTC

[12/13] Merge Jason's SQL updates to work with full exec. Random vector updates including changing to copyFrom Fixes to writable batch. Add an alternative ByteBuf implementation that leverages little endianness.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index dcaf823..289ec4b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -2,35 +2,29 @@ package org.apache.drill.exec.opt;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 
 import org.apache.drill.common.PlanProperties;
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.*;
 import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.data.*;
+import org.apache.drill.common.logical.data.Filter;
 import org.apache.drill.common.logical.data.Project;
-import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.common.logical.data.SinkOperator;
-import org.apache.drill.common.logical.data.Store;
 import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.exception.OptimizerException;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.MockScanPOP;
-import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.*;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 
-/**
- * Created with IntelliJ IDEA.
- * User: jaltekruse
- * Date: 6/11/13
- * Time: 5:32 PM
- * To change this template use File | Settings | File Templates.
- */
 public class BasicOptimizer extends Optimizer{
 
     private DrillConfig config;
@@ -58,9 +52,9 @@ public class BasicOptimizer extends Optimizer{
                 System.out.println(pop);
                 physOps.add(pop);
             } catch (OptimizerException e) {
-                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                e.printStackTrace();
             } catch (Throwable throwable) {
-                throwable.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                throwable.printStackTrace();
             }
         }
 
@@ -99,9 +93,11 @@ public class BasicOptimizer extends Optimizer{
                 }
                 else{
                     myObjects = new ArrayList<>();
-                    MockScanPOP.MockColumn[] cols = { new MockScanPOP.MockColumn("blah", MinorType.INT, DataMode.REQUIRED,4,4,4),
-                            new MockScanPOP.MockColumn("blah_2", MinorType.INT, DataMode.REQUIRED,4,4,4)};
-                    myObjects.add(new MockScanPOP.MockScanEntry(50, cols));
+                    MockScanPOP.MockColumn[] cols = {
+                        new MockScanPOP.MockColumn("RED", MinorType.BIGINT, DataMode.REQUIRED, null, null, null),
+                        new MockScanPOP.MockColumn("GREEN", MinorType.BIGINT, DataMode.REQUIRED,null, null, null)
+                    };
+                    myObjects.add(new MockScanPOP.MockScanEntry(100, cols));
                 }
             } catch (IOException e) {
                 e.printStackTrace();
@@ -121,7 +117,22 @@ public class BasicOptimizer extends Optimizer{
 
         @Override
         public PhysicalOperator visitProject(Project project, Object obj) throws OptimizerException {
-            return project.getInput().accept(this, obj);
+          return project.getInput().accept(this, obj);
+//            return new org.apache.drill.exec.physical.config.Project(
+//                Arrays.asList(project.getSelections()), project.iterator().next().accept(this, obj));
         }
+
+      @Override
+      public PhysicalOperator visitFilter(Filter filter, Object obj) throws OptimizerException {
+        TypeProtos.MajorType.Builder b = TypeProtos.MajorType.getDefaultInstance().newBuilderForType();
+        b.setMode(DataMode.REQUIRED);
+        b.setMinorType(MinorType.BIGINT);
+
+        return new SelectionVectorRemover(new org.apache.drill.exec.physical.config.Filter(
+            filter.iterator().next().accept(this, obj), /*filter.getExpr() */
+            new FunctionCall(FunctionDefinition.simple("alternate", new NoArgValidator(),
+                new OutputTypeDeterminer.FixedType(b.build())), null, new ExpressionPosition("asdf", 1)),
+            1.0f));
+      }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
index bf3cd91..18aa484 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
@@ -55,15 +55,15 @@ public abstract class FilterTemplate implements Filterer{
   
   private void filterBatchSV2(int recordCount){
     int svIndex = 0;
-    final int count = recordCount*2;
-    for(int i = 0; i < count; i+=2){
+    final int count = recordCount;
+    for(int i = 0; i < count; i++){
       char index = incomingSelectionVector.getIndex(i);
       if(doEval(i, 0)){
         outgoingSelectionVector.setIndex(svIndex, index);
-        svIndex+=2;
+        svIndex++;
       }
     }
-    outgoingSelectionVector.setRecordCount(svIndex/2);
+    outgoingSelectionVector.setRecordCount(svIndex);
   }
   
   private void filterBatchNoSV(int recordCount){
@@ -72,10 +72,10 @@ public abstract class FilterTemplate implements Filterer{
       
       if(doEval(i, 0)){
         outgoingSelectionVector.setIndex(svIndex, i);
-        svIndex+=2;
+        svIndex++;
       }
     }
-    outgoingSelectionVector.setRecordCount(svIndex/2);
+    outgoingSelectionVector.setRecordCount(svIndex);
   }
   
   protected abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
index 7929296..85d7991 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
@@ -43,6 +43,7 @@ public class VectorRecordMaterializer implements RecordMaterializer{
   }
 
   public QueryWritableBatch convertNext(boolean isLast) {
+    //batch.getWritableBatch().getDef().getRecordCount()
     WritableBatch w = batch.getWritableBatch();
 
     QueryResult header = QueryResult.newBuilder() //

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index 8d47678..5f15c2d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -31,8 +31,8 @@ public abstract class ProjectorTemplate implements Projector {
       
       
     case TWO_BYTE:
-      final int count = recordCount*2;
-      for(int i = 0; i < count; i+=2, firstOutputIndex++){
+      final int count = recordCount;
+      for(int i = 0; i < count; i++, firstOutputIndex++){
         doEval(vector2.getIndex(i), firstOutputIndex);
       }
       return recordCount;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java
index 12a1e0a..6a0e2c3 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java
@@ -31,7 +31,7 @@ public abstract class CopierTemplate implements Copier{
     allocateVectors(recordCount);
     int outgoingPosition = 0;
     
-    for(int svIndex = 0; svIndex < recordCount * 2; svIndex++, outgoingPosition++){
+    for(int svIndex = 0; svIndex < recordCount; svIndex++, outgoingPosition++){
       doEval(svIndex, outgoingPosition);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 7b90717..68793b0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -40,7 +40,7 @@ public class RemovingRecordBatch implements RecordBatch{
   private Copier copier;
   private List<ValueVector> outputVectors;
   private VectorHolder vh;
-  
+  private int recordCount;
   
   public RemovingRecordBatch(RecordBatch incoming, FragmentContext context){
     this.incoming = incoming;
@@ -65,7 +65,7 @@ public class RemovingRecordBatch implements RecordBatch{
 
   @Override
   public int getRecordCount() {
-    return incoming.getRecordCount();
+    return recordCount;
   }
 
   @Override
@@ -95,7 +95,7 @@ public class RemovingRecordBatch implements RecordBatch{
 
   @Override
   public IterOutcome next() {
-    
+    recordCount = 0;
     IterOutcome upstream = incoming.next();
     logger.debug("Upstream... {}", upstream);
     switch(upstream){
@@ -114,7 +114,7 @@ public class RemovingRecordBatch implements RecordBatch{
       }
       // fall through.
     case OK:
-      int recordCount = incoming.getRecordCount();
+      recordCount = incoming.getRecordCount();
       copier.copyRecords();
       for(ValueVector v : this.outputVectors){
         ValueVector.Mutator m = v.getMutator();
@@ -195,7 +195,7 @@ public class RemovingRecordBatch implements RecordBatch{
     this.vh = new VectorHolder(outputVectors);
 
     SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(SelectionVectorMode.NONE);
-    for(ValueVector v : outputVectors){
+    for(ValueVector v : incoming){
       bldr.addField(v.getField());
     }
     this.outSchema = bldr.build();
@@ -225,7 +225,7 @@ public class RemovingRecordBatch implements RecordBatch{
       JVar inVV = declareVVSetup("incoming", g, fieldId, vvClass);
       JVar outVV = declareVVSetup("outgoing", g, fieldId, vvClass);
       
-      g.getBlock().add(inVV.invoke("copyValue").arg(inIndex).arg(outIndex).arg(outVV));
+      g.getBlock().add(outVV.invoke("copyFrom").arg(inIndex).arg(outIndex).arg(inVV));
       
       fieldId++;
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index e7f381f..685cc77 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -61,8 +61,13 @@ public class WritableBatch {
     List<ByteBuf> buffers = Lists.newArrayList();
     List<FieldMetadata> metadata = Lists.newArrayList();
 
+    
     for (ValueVector vv : vectors) {
       metadata.add(vv.getMetadata());
+      
+      // don't try to get the buffers if we don't have any records.  It is possible the buffers are dead buffers.
+      if(recordCount == 0) continue;
+      
       for (ByteBuf b : vv.getBuffers()) {
         buffers.add(b);
         b.retain();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index 33938cc..a38c8e5 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -43,12 +43,12 @@ public class SelectionVector2 implements Closeable{
     return recordCount;
   }
 
-  public char getIndex(int directIndex){
-    return buffer.getChar(directIndex);
+  public char getIndex(int index){
+    return buffer.getChar(index);
   }
 
-  public void setIndex(int directIndex, char value){
-    buffer.setChar(directIndex, value);
+  public void setIndex(int index, char value){
+    buffer.setChar(index*2, value);
   }
   
   public void allocateNew(int size){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index ad44ff2..40b8edc 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -45,8 +45,12 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
     super(UserRpcConfig.MAPPING, alloc, eventLoopGroup, RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER);
   }
 
-  public void submitQuery(UserResultsListener resultsListener, RunQuery query) throws RpcException {
-    send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
+  public void submitQuery(UserResultsListener resultsListener, RunQuery query) {
+    try{
+      send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class);  
+    }catch(RpcException ex){
+      resultsListener.submissionFailed(ex);
+    }
   }
 
   public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint) throws RpcException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 1376b7d..910b80e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -57,8 +57,8 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
     return len;
   }
 
-  public void copyValue(int inIndex, int outIndex, BitVector target) {
-    target.mutator.set(outIndex, this.accessor.get(inIndex));
+  public void copyFrom(int inIndex, int outIndex, BitVector from) {
+    this.mutator.set(outIndex, from.accessor.get(inIndex));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index a90382a..0a4614a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -51,6 +51,7 @@ import org.apache.drill.exec.util.AtomicState;
 import org.apache.drill.exec.work.QueryWorkUnit;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 
 /**
@@ -163,7 +164,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
   private void parseAndRunLogicalPlan(String json) {
     try {
       LogicalPlan logicalPlan = context.getPlanReader().readLogicalPlan(json);
+      logger.debug("Logical {}", logicalPlan.unparse(DrillConfig.create()));
       PhysicalPlan physicalPlan = convert(logicalPlan);
+      logger.debug("Physical {}", new ObjectMapper().writeValueAsString(physicalPlan));
       runPhysicalPlan(physicalPlan);
     } catch (IOException e) {
       fail("Failure while parsing logical plan.", e);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestEndianess.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
new file mode 100644
index 0000000..155b8da
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
@@ -0,0 +1,24 @@
+package org.apache.drill.exec.memory;
+
+import static org.junit.Assert.assertEquals;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.memory.DirectBufferAllocator;
+import org.junit.Test;
+
+
+
+public class TestEndianess {
+  
+  @Test
+  public void testLittleEndian(){
+    DirectBufferAllocator a = new DirectBufferAllocator();
+    ByteBuf b = a.buffer(4);
+    b.setInt(0, 35);
+    assertEquals((int) b.getByte(0), 35);
+    assertEquals((int) b.getByte(1), 0);
+    assertEquals((int) b.getByte(2), 0);
+    assertEquals((int) b.getByte(3), 0);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/pom.xml b/sandbox/prototype/exec/pom.xml
index df02dea..8c54a19 100644
--- a/sandbox/prototype/exec/pom.xml
+++ b/sandbox/prototype/exec/pom.xml
@@ -15,6 +15,7 @@
   <dependencies>
   </dependencies>
   <modules>
+    <module>bufferl</module>
     <module>java-exec</module>
     <module>ref</module>
   </modules>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/pom.xml b/sandbox/prototype/sqlparser/pom.xml
index 3309d36..9be576e 100644
--- a/sandbox/prototype/sqlparser/pom.xml
+++ b/sandbox/prototype/sqlparser/pom.xml
@@ -26,7 +26,7 @@
 	</repositories>
 
 	<dependencies>
-		<dependency>
+    	<dependency>
 			<groupId>net.hydromatic</groupId>
 			<artifactId>optiq</artifactId>
 			<version>0.4.2</version>
@@ -34,13 +34,32 @@
 		<dependency>
 			<groupId>net.hydromatic</groupId>
 			<artifactId>linq4j</artifactId>
-			<version>0.1.3</version>
+			<version>0.1.2</version>
+		</dependency>
+        <dependency>
+			<groupId>org.apache.drill</groupId>
+			<artifactId>common</artifactId>
+			<version>1.0-SNAPSHOT</version>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.drill.exec</groupId>
 			<artifactId>ref</artifactId>
       <version>${project.version}</version>
+      </dependency>
+      <dependency>
+    	<groupId>org.apache.drill.exec</groupId>
+			<artifactId>java-exec</artifactId>
+      <version>${project.version}</version>
+		</dependency>
+      <dependency>
+    	<groupId>org.apache.drill.exec</groupId>
+			<artifactId>java-exec</artifactId>
+            <version>${project.version}</version>
+			<classifier>tests</classifier>
+			<scope>test</scope>
 		</dependency>
+
+
 		<dependency>
 			<groupId>org.apache.drill.exec</groupId>
 			<artifactId>ref</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java
index 23d7237..602a55d 100644
--- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java
@@ -17,6 +17,7 @@
  ******************************************************************************/
 package org.apache.drill.jdbc;
 
+import java.io.IOException;
 import java.lang.reflect.Type;
 import java.util.Collections;
 import java.util.Map;
@@ -30,46 +31,73 @@ import net.hydromatic.linq4j.expressions.MethodCallExpression;
 
 import net.hydromatic.optiq.*;
 
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.ref.rops.DataWriter;
 import org.apache.drill.exec.ref.rse.ClasspathRSE;
 import org.apache.drill.exec.ref.rse.ClasspathRSE.ClasspathInputConfig;
-import org.apache.drill.optiq.*;
 
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.optiq.DrillRel;
+import org.apache.drill.optiq.DrillScan;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptTable;
 import org.eigenbase.reltype.RelDataType;
 import org.eigenbase.reltype.RelDataTypeFactory;
 import org.eigenbase.sql.type.SqlTypeName;
 
-/** Optiq Table used by Drill. */
+/**
+ * Optiq Table used by Drill.
+ */
 public class DrillTable extends BaseQueryable<Object>
-    implements TranslatableTable<Object>
-{
+    implements TranslatableTable<Object> {
   private final Schema schema;
   private final String name;
   private final String storageEngineName;
   private final RelDataType rowType;
   public final StorageEngineConfig storageEngineConfig;
   public final Object selection;
+  private boolean useReferenceInterpreter;
 
-  /** Creates a DrillTable. */
+  // full engine connection information
+  public Drillbit bit;
+  public DrillClient client;
+
+  /**
+   * Creates a DrillTable.
+   */
   public DrillTable(Schema schema,
-      Type elementType,
-      Expression expression,
-      RelDataType rowType,
-      String name,
-      StorageEngineConfig storageEngineConfig,
-      Object selection,
-      String storageEngineName
-      ) {
+                    Type elementType,
+                    Expression expression,
+                    RelDataType rowType,
+                    String name,
+                    StorageEngineConfig storageEngineConfig,
+                    Object selection,
+                    String storageEngineName,
+                    boolean useReferenceInterpreter
+  ) {
+
     super(schema.getQueryProvider(), elementType, expression);
+    DrillConfig config = DrillConfig.create();
+    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+    try {
+      bit = new Drillbit(config, serviceSet);
+      client = new DrillClient(config, serviceSet.getCoordinator());
+      bit.run();
+    } catch (IOException e) {
+      System.out.println("Error creating drill client or connecting to drillbit.");
+    } catch (Exception e) {
+      System.out.println("Error creating drill client or connecting to drillbit.");
+    }
     this.schema = schema;
     this.name = name;
     this.rowType = rowType;
     this.storageEngineConfig = storageEngineConfig;
     this.selection = selection;
     this.storageEngineName = storageEngineName;
+    this.useReferenceInterpreter = useReferenceInterpreter;
   }
 
   private static DrillTable createTable(
@@ -78,8 +106,9 @@ public class DrillTable extends BaseQueryable<Object>
       String name,
       StorageEngineConfig storageEngineConfig,
       Object selection,
-      String storageEngineName
-      ) {
+      String storageEngineName,
+      boolean useReferenceInterpreter
+  ) {
     final MethodCallExpression call = Expressions.call(schema.getExpression(),
         BuiltinMethod.DATA_CONTEXT_GET_TABLE.method,
         Expressions.constant(name),
@@ -91,8 +120,8 @@ public class DrillTable extends BaseQueryable<Object>
                     typeFactory.createSqlType(SqlTypeName.VARCHAR),
                     typeFactory.createSqlType(SqlTypeName.ANY))),
             Collections.singletonList("_MAP"));
-      return new DrillTable(schema, Object.class, call, rowType, name,
-          storageEngineConfig, selection, storageEngineName);
+    return new DrillTable(schema, Object.class, call, rowType, name,
+        storageEngineConfig, selection, storageEngineName, useReferenceInterpreter);
   }
 
   @Override
@@ -129,19 +158,32 @@ public class DrillTable extends BaseQueryable<Object>
     return t0 != null ? t0 : t1;
   }
 
-  /** Factory for custom tables in Optiq schema. */
+  public boolean useReferenceInterpreter() {
+    return useReferenceInterpreter;
+  }
+
+  /**
+   * Factory for custom tables in Optiq schema.
+   */
   @SuppressWarnings("UnusedDeclaration")
   public static class Factory implements TableFactory<DrillTable> {
     @Override
     public DrillTable create(Schema schema, String name,
-        Map<String, Object> operand, RelDataType rowType) {
+                             Map<String, Object> operand, RelDataType rowType) {
       final ClasspathRSE.ClasspathRSEConfig rseConfig =
           new ClasspathRSE.ClasspathRSEConfig();
       final ClasspathInputConfig inputConfig = new ClasspathInputConfig();
       inputConfig.path = last((String) operand.get("path"), "/donuts.json");
+      boolean useReferenceInterpreter;
+      if (operand.get("useReferenceInterpreter") != null){
+        useReferenceInterpreter = operand.get("useReferenceInterpreter").equals("true") ? true : false;
+      }
+      else{
+        useReferenceInterpreter = false;
+      }
       inputConfig.type = DataWriter.ConverterType.JSON;
       return createTable(schema.getTypeFactory(), (MutableSchema) schema, name,
-          rseConfig, inputConfig, "donuts-json");
+          rseConfig, inputConfig, "donuts-json", useReferenceInterpreter);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java
deleted file mode 100644
index 0a225cf..0000000
--- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.optiq;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.*;
-
-import net.hydromatic.linq4j.AbstractEnumerable;
-import net.hydromatic.linq4j.Enumerable;
-import net.hydromatic.linq4j.Enumerator;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.logical.LogicalPlan;
-import org.apache.drill.exec.ref.IteratorRegistry;
-import org.apache.drill.exec.ref.ReferenceInterpreter;
-import org.apache.drill.exec.ref.RunOutcome;
-import org.apache.drill.exec.ref.eval.BasicEvaluatorFactory;
-import org.apache.drill.exec.ref.rse.RSERegistry;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * Runtime helper that executes a Drill query and converts it into an
- * {@link Enumerable}.
- */
-public class EnumerableDrill<E>
-    extends AbstractEnumerable<E>
-    implements Enumerable<E> {
-  private final LogicalPlan plan;
-  final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(100);
-  final DrillConfig config;
-  private final String holder;
-  private final List<String> fields;
-
-  private static final ObjectMapper mapper = createMapper();
-
-  /** Creates a DrillEnumerable.
-   *
-   * @param plan Logical plan
-   * @param clazz Type of elements returned from enumerable
-   * @param fields Names of fields, or null to return the whole blob
-   */
-  public EnumerableDrill(DrillConfig config, LogicalPlan plan, Class<E> clazz,
-      List<String> fields) {
-    this.plan = plan;
-    this.config = config;
-    this.holder = null;
-    this.fields = fields;
-    config.setSinkQueues(0, queue);
-  }
-
-  /** Creates a DrillEnumerable from a plan represented as a string. Each record
-   * returned is a {@link JsonNode}. */
-  public static <E> EnumerableDrill<E> of(String plan,
-      final List<String> fieldNames, Class<E> clazz) {
-    DrillConfig config = DrillConfig.create();
-    final LogicalPlan parse = LogicalPlan.parse(config, plan);
-    return new EnumerableDrill<>(config, parse, clazz, fieldNames);
-  }
-
-  /** Runs the plan as a background task. */
-  Future<Collection<RunOutcome>> runPlan(
-      CompletionService<Collection<RunOutcome>> service) {
-    IteratorRegistry ir = new IteratorRegistry();
-    DrillConfig config = DrillConfig.create();
-    config.setSinkQueues(0, queue);
-    final ReferenceInterpreter i =
-        new ReferenceInterpreter(plan, ir, new BasicEvaluatorFactory(ir),
-            new RSERegistry(config));
-    try {
-      i.setup();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return service.submit(
-        new Callable<Collection<RunOutcome>>() {
-          @Override
-          public Collection<RunOutcome> call() throws Exception {
-            Collection<RunOutcome> outcomes = i.run();
-
-            for (RunOutcome outcome : outcomes) {
-              System.out.println("============");
-              System.out.println(outcome);
-              if (outcome.outcome == RunOutcome.OutcomeType.FAILED
-                  && outcome.exception != null) {
-                outcome.exception.printStackTrace();
-              }
-            }
-            return outcomes;
-          }
-        });
-  }
-
-  @Override
-  public Enumerator<E> enumerator() {
-    // TODO: use a completion service from the container
-    final ExecutorCompletionService<Collection<RunOutcome>> service =
-        new ExecutorCompletionService<Collection<RunOutcome>>(
-            new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS,
-                new LinkedBlockingDeque<Runnable>(10)));
-
-    // Run the plan using an executor. It runs in a different thread, writing
-    // results to our queue.
-    //
-    // TODO: use the result of task, and check for exceptions
-    final Future<Collection<RunOutcome>> task = runPlan(service);
-
-    return new JsonEnumerator(queue, fields);
-  }
-
-  private static ObjectMapper createMapper() {
-    return new ObjectMapper();
-  }
-
-  /** Converts a JSON document, represented as an array of bytes, into a Java
-   * object (consisting of Map, List, String, Integer, Double, Boolean). */
-  static Object parseJson(byte[] bytes) {
-    try {
-      return wrapper(mapper.readTree(bytes));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /** Converts a JSON node to Java objects ({@link List}, {@link Map},
-   * {@link String}, {@link Integer}, {@link Double}, {@link Boolean}. */
-  static Object wrapper(JsonNode node) {
-    switch (node.asToken()) {
-    case START_OBJECT:
-      return map((ObjectNode) node);
-    case START_ARRAY:
-      return array((ArrayNode) node);
-    case VALUE_STRING:
-      return node.asText();
-    case VALUE_NUMBER_INT:
-      return node.asInt();
-    case VALUE_NUMBER_FLOAT:
-      return node.asDouble();
-    case VALUE_TRUE:
-      return Boolean.TRUE;
-    case VALUE_FALSE:
-      return Boolean.FALSE;
-    case VALUE_NULL:
-      return null;
-    default:
-      throw new AssertionError("unexpected: " + node + ": " + node.asToken());
-    }
-  }
-
-  private static List<Object> array(ArrayNode node) {
-    final List<Object> list = new ArrayList<>();
-    for (JsonNode jsonNode : node) {
-      list.add(wrapper(jsonNode));
-    }
-    return Collections.unmodifiableList(list);
-  }
-
-  private static SortedMap<String, Object> map(ObjectNode node) {
-    // TreeMap makes the results deterministic.
-    final TreeMap<String, Object> map = new TreeMap<>();
-    final Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
-    while (fields.hasNext()) {
-      Map.Entry<String, JsonNode> next = fields.next();
-      map.put(next.getKey(), wrapper(next.getValue()));
-    }
-    return Collections.unmodifiableSortedMap(map);
-  }
-
-  private static class JsonEnumerator implements Enumerator {
-    private final BlockingQueue<Object> queue;
-    private final String holder;
-    private final List<String> fields;
-    private Object current;
-
-    public JsonEnumerator(BlockingQueue<Object> queue, List<String> fields) {
-      this.queue = queue;
-      this.holder = null;
-      this.fields = fields;
-    }
-
-    public Object current() {
-      return current;
-    }
-
-    public boolean moveNext() {
-      try {
-        Object o = queue.take();
-        if (o instanceof RunOutcome.OutcomeType) {
-          switch ((RunOutcome.OutcomeType) o) {
-          case SUCCESS:
-            return false; // end of data
-          case CANCELED:
-            throw new RuntimeException("canceled");
-          case FAILED:
-          default:
-            throw new RuntimeException("failed");
-          }
-        } else {
-          Object o1 = parseJson((byte[]) o);
-          if (holder != null) {
-            o1 = ((Map<String, Object>) o1).get(holder);
-          }
-          if (fields == null) {
-            current = o1;
-          } else {
-            final Map<String, Object> map = (Map<String, Object>) o1;
-            if (fields.size() == 1) {
-              current = map.get(fields.get(0));
-            } else {
-              Object[] os = new Object[fields.size()];
-              for (int i = 0; i < os.length; i++) {
-                os[i] = map.get(fields.get(i));
-              }
-              current = os;
-            }
-          }
-          return true;
-        }
-      } catch (InterruptedException e) {
-        Thread.interrupted();
-        throw new RuntimeException(e);
-      }
-    }
-
-    public void reset() {
-      throw new UnsupportedOperationException();
-    }
-  }
-}
-
-// End EnumerableDrill.java

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillFullEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillFullEngine.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillFullEngine.java
new file mode 100644
index 0000000..8c41b99
--- /dev/null
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillFullEngine.java
@@ -0,0 +1,85 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.optiq;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import net.hydromatic.linq4j.AbstractEnumerable;
+import net.hydromatic.linq4j.Enumerable;
+import net.hydromatic.linq4j.Enumerator;
+import net.hydromatic.optiq.DataContext;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.jdbc.DrillTable;
+import org.apache.drill.sql.client.full.DrillFullImpl;
+import org.apache.drill.sql.client.ref.DrillRefImpl;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Runtime helper that executes a Drill query and converts it into an {@link Enumerable}.
+ */
+public class EnumerableDrillFullEngine<E> extends AbstractEnumerable<E> implements Enumerable<E> {
+  private final String plan;
+  final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(100);
+  final DrillConfig config;
+  private final List<String> fields;
+  private DataContext drillConnectionDataContext;
+
+  /**
+   * Creates a DrillEnumerable.
+   * 
+   * @param plan
+   *          Logical plan
+   * @param clazz
+   *          Type of elements returned from enumerable
+   * @param fields
+   *          Names of fields, or null to return the whole blob
+   */
+  public EnumerableDrillFullEngine(DrillConfig config, String plan, Class<E> clazz, List<String> fields, DataContext drillConnectionDataContext) {
+    this.plan = plan;
+    this.config = config;
+    this.fields = fields;
+    this.drillConnectionDataContext = drillConnectionDataContext;
+    config.setSinkQueues(0, queue);
+  }
+
+  /**
+   * Creates a DrillEnumerable from a plan represented as a string. Each record returned is a {@link JsonNode}.
+   */
+  public static <E> EnumerableDrillFullEngine<E> of(String plan, final List<String> fieldNames, Class<E> clazz,
+      DataContext drillTable) {
+    DrillConfig config = DrillConfig.create();
+    return new EnumerableDrillFullEngine<>(config, plan, clazz, fieldNames, drillTable);
+  }
+
+  @Override
+  public Enumerator<E> enumerator() {
+    DrillTable table = (DrillTable) drillConnectionDataContext.getSubSchema("DONUTS").getTable("DONUTS", Object.class);
+    if(table.useReferenceInterpreter()){
+      DrillRefImpl<E> impl = new DrillRefImpl<E>(plan, config, fields, queue);
+      return impl.enumerator(table);
+    } else {
+      DrillFullImpl<E> impl = new DrillFullImpl<E>(plan, config, fields);
+      return impl.enumerator(table);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRel.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRel.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRel.java
index ce21c92..567378d 100644
--- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRel.java
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRel.java
@@ -20,10 +20,12 @@ package org.apache.drill.optiq;
 import net.hydromatic.linq4j.expressions.*;
 import net.hydromatic.linq4j.function.Function1;
 import net.hydromatic.linq4j.function.Functions;
+import net.hydromatic.optiq.DataContext;
 import net.hydromatic.optiq.impl.java.JavaTypeFactory;
 import net.hydromatic.optiq.rules.java.*;
 
 import org.apache.drill.common.util.Hook;
+import org.apache.drill.optiq.EnumerableDrillFullEngine;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.rel.SingleRel;
 import org.eigenbase.relopt.*;
@@ -44,7 +46,7 @@ public class EnumerableDrillRel extends SingleRel implements EnumerableRel {
   private static final Logger LOG =
       LoggerFactory.getLogger(EnumerableDrillRel.class);
 
-  private static final Function1<String,Expression> TO_LITERAL =
+  private static final Function1<String, Expression> TO_LITERAL =
       new Function1<String, Expression>() {
         @Override
         public Expression apply(String a0) {
@@ -59,16 +61,16 @@ public class EnumerableDrillRel extends SingleRel implements EnumerableRel {
   static {
     try {
       OF_METHOD =
-          EnumerableDrill.class.getMethod("of", String.class, List.class, Class.class);
+          EnumerableDrillFullEngine.class.getMethod("of", String.class, List.class, Class.class, DataContext.class);
+      //EnumerableDrillFullEngine.class.getMethod("of", String.class, List.class, Class.class);
     } catch (NoSuchMethodException e) {
       throw new RuntimeException(e);
     }
   }
 
   public EnumerableDrillRel(RelOptCluster cluster,
-      RelTraitSet traitSet,
-      RelNode input)
-  {
+                            RelTraitSet traitSet,
+                            RelNode input) {
     super(cluster, traitSet, input);
     assert getConvention() instanceof EnumerableConvention;
     assert input.getConvention() == DrillRel.CONVENTION;
@@ -100,7 +102,11 @@ public class EnumerableDrillRel extends SingleRel implements EnumerableRel {
     drillImplementor.go(input);
     String plan = drillImplementor.getJsonString();
     Hook.LOGICAL_PLAN.run(plan);
+
+    // not quite sure where this list was supposed to be set earlier, leaving it null got me back the full result set
+
     final List<String> fieldNameList = RelOptUtil.getFieldNameList(rowType);
+    //final List<String> fieldNameList = null;
     return new BlockBuilder()
         .append(
             Expressions.call(
@@ -112,7 +118,9 @@ public class EnumerableDrillRel extends SingleRel implements EnumerableRel {
                     Expressions.newArrayInit(
                         String.class,
                         Functions.apply(fieldNameList, TO_LITERAL))),
-                Expressions.constant(Object.class)))
+                Expressions.constant(Object.class),
+                Expressions.variable(DataContext.class, "root")
+            ))
         .toBlock();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRule.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRule.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRule.java
index fa1bd4f..6057163 100644
--- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRule.java
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRule.java
@@ -22,20 +22,14 @@ import org.eigenbase.rel.RelNode;
 import org.eigenbase.rel.convert.ConverterRule;
 
 /**
- * Rule that converts any Drill relational expression to enumerable format by
- * adding a {@link EnumerableDrillRel}.
+ * Rule that converts any Drill relational expression to enumerable format by adding a {@link EnumerableDrillRel}.
  */
 public class EnumerableDrillRule extends ConverterRule {
-  public static final EnumerableDrillRule ARRAY_INSTANCE =
-      new EnumerableDrillRule(EnumerableConvention.ARRAY);
-  public static final EnumerableDrillRule CUSTOM_INSTANCE =
-      new EnumerableDrillRule(EnumerableConvention.CUSTOM);
+  public static final EnumerableDrillRule ARRAY_INSTANCE = new EnumerableDrillRule(EnumerableConvention.ARRAY);
+  public static final EnumerableDrillRule CUSTOM_INSTANCE = new EnumerableDrillRule(EnumerableConvention.CUSTOM);
 
   private EnumerableDrillRule(EnumerableConvention outConvention) {
-    super(RelNode.class,
-        DrillRel.CONVENTION,
-        outConvention,
-        "EnumerableDrillRule." + outConvention);
+    super(RelNode.class, DrillRel.CONVENTION, outConvention, "EnumerableDrillRule." + outConvention);
   }
 
   @Override
@@ -46,9 +40,7 @@ public class EnumerableDrillRule extends ConverterRule {
   @Override
   public RelNode convert(RelNode rel) {
     assert rel.getTraitSet().contains(DrillRel.CONVENTION);
-    return new EnumerableDrillRel(rel.getCluster(),
-        rel.getTraitSet().replace(getOutConvention()),
-        rel);
+    return new EnumerableDrillRel(rel.getCluster(), rel.getTraitSet().replace(getOutConvention()), rel);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java
new file mode 100644
index 0000000..b3a7e35
--- /dev/null
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java
@@ -0,0 +1,48 @@
+package org.apache.drill.sql.client.full;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
+
+public class BatchListener implements UserResultsListener {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchListener.class);
+
+  private RpcException ex;
+  private volatile boolean completed = false;
+  
+  final BlockingQueue<QueryResultBatch> queue = new ArrayBlockingQueue<>(100);
+
+  @Override
+  public void submissionFailed(RpcException ex) {
+    this.ex = ex;
+    completed = true;
+  }
+
+  @Override
+  public void resultArrived(QueryResultBatch result) {
+    logger.debug("Result arrived {}", result);
+    queue.add(result);
+    if(result.getHeader().getIsLastChunk()){
+      completed = true;
+    }
+  }
+
+  public boolean completed(){
+    return completed;
+  }
+
+  public QueryResultBatch getNext() throws RpcException, InterruptedException{
+    if(ex != null) throw ex;
+    if(completed && queue.isEmpty()){
+      return null;
+    }else{
+      return queue.take();
+    }
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java
new file mode 100644
index 0000000..4eb243d
--- /dev/null
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java
@@ -0,0 +1,185 @@
+package org.apache.drill.sql.client.full;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import jline.internal.Preconditions;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.fasterxml.jackson.core.JsonGenerationException;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import com.google.common.collect.Maps;
+
+@JsonSerialize(using = BatchLoaderMap.Se.class)
+public class BatchLoaderMap implements Map<String, Object> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchLoaderMap.class);
+
+  private BatchListener listener;
+  private RecordBatchLoader loader;
+  private final List<String> requestedFields;
+  private final Map<String, ValueVector> fields = Maps.newHashMap();
+  private int index;
+  private Object[] objArr;
+
+  public BatchLoaderMap(List<String> requestedFields, BatchListener listener, DrillbitContext context) {
+    this.listener = listener;
+    this.requestedFields = requestedFields;
+    this.objArr = new Object[requestedFields.size()];
+    this.loader = new RecordBatchLoader(context.getAllocator());
+  }
+
+  private void load(QueryResultBatch batch) throws SchemaChangeException {
+    boolean schemaChanged = loader.load(batch.getHeader().getDef(), batch.getData());
+    if (schemaChanged) {
+      fields.clear();
+      for (ValueVector v : loader) {
+        fields.put(v.getField().getName(), v);
+      }
+    } else {
+      logger.debug("Schema didn't change. {}", batch);
+    }
+  }
+
+  public boolean next() throws SchemaChangeException, RpcException, InterruptedException {
+    index++;
+    if (index < loader.getRecordCount()) {
+      return true;
+    } else {
+      logger.debug("Starting next query result batch.");
+      QueryResultBatch qrb;
+      while( (qrb = listener.getNext()) != null && !qrb.hasData()){
+        qrb = listener.getNext();
+      }
+      
+      if (qrb == null) {
+        logger.debug("No more batches found.");
+        index = -1;
+        return false;
+      } else {
+        load(qrb);
+        logger.debug("New batch found and loaded. {}", qrb.getHeader().getDef());
+        index = 0;
+        return true;
+      }
+
+    }
+  }
+
+  public Object getCurrentAsObjectArray() {
+    
+    for (int i = 0; i < requestedFields.size(); i++) {
+      ValueVector vv = fields.get(requestedFields.get(i));
+      if (vv == null) {
+        objArr[i] = null;
+      } else {
+        objArr[i] = vv.getAccessor().getObject(index);
+      }
+    }
+    return objArr;
+
+  }
+
+  @Override
+  public int size() {
+    return fields.size();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return fields.isEmpty();
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+    if (!(key instanceof String))
+      return false;
+    return fields.containsKey(key);
+  }
+
+  @Override
+  public boolean containsValue(Object value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Object get(Object key) {
+    ValueVector v = fields.get(key);
+    Preconditions.checkNotNull(v);
+    return v.getAccessor().getObject(index);
+  }
+
+  @Override
+  public Object put(String key, Object value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Object remove(Object key) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void putAll(Map<? extends String, ? extends Object> m) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void clear() {
+  }
+
+  @Override
+  public Set<String> keySet() {
+    return fields.keySet();
+  }
+
+  @Override
+  public Collection<Object> values() {
+    throw new UnsupportedOperationException();
+  }
+
+  public String toString() {
+    try {
+      return new ObjectMapper().writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Set<java.util.Map.Entry<String, Object>> entrySet() {
+    throw new UnsupportedOperationException();
+  }
+
+  public static class Se extends StdSerializer<BatchLoaderMap> {
+
+    public Se() {
+      super(BatchLoaderMap.class);
+    }
+
+    @Override
+    public void serialize(BatchLoaderMap value, JsonGenerator jgen, SerializerProvider provider) throws IOException,
+        JsonGenerationException {
+      jgen.writeStartObject();
+      assert value.index > -1 && value.index < value.loader.getRecordCount();
+      for (Map.Entry<String, ValueVector> me : value.fields.entrySet()) {
+        jgen.writeObjectField(me.getKey(), me.getValue().getAccessor().getObject(value.index));
+      }
+      jgen.writeEndObject();
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/DrillFullImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/DrillFullImpl.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/DrillFullImpl.java
new file mode 100644
index 0000000..0974365
--- /dev/null
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/DrillFullImpl.java
@@ -0,0 +1,64 @@
+package org.apache.drill.sql.client.full;
+
+import java.util.List;
+
+import net.hydromatic.linq4j.Enumerator;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.jdbc.DrillTable;
+
+public class DrillFullImpl<E>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFullImpl.class);
+
+  private final String plan;
+  final DrillConfig config;
+  private final List<String> fields;
+
+  
+  public DrillFullImpl(String plan, DrillConfig config, List<String> fields) {
+    super();
+    this.plan = plan;
+    this.config = config;
+    this.fields = fields;
+  }
+
+  public Enumerator<E> enumerator(DrillTable table) {
+    
+    BatchListener listener = new BatchListener();
+
+    // TODO: use a completion service from the container
+    QueryRequestRunner runner = new QueryRequestRunner(plan, table.client, listener);
+    runner.start();
+    
+    return (Enumerator<E>) new ResultEnumerator(listener, table.bit.getContext(),fields);
+    
+  }
+  
+  public class QueryRequestRunner extends Thread{
+    final String plan;
+    final DrillClient client;
+    final BatchListener listener;
+    
+    public QueryRequestRunner(String plan, DrillClient client, BatchListener listener) {
+      super();
+      this.setDaemon(true);
+      this.plan = plan;
+      this.client = client;
+      this.listener = listener;
+    }
+
+    @Override
+    public void run() {
+      try {
+        client.connect();
+        client.runQuery(UserProtos.QueryType.LOGICAL, plan, listener);
+      } catch (RpcException e) {
+        listener.submissionFailed(e);
+      }
+      
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/ResultEnumerator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/ResultEnumerator.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/ResultEnumerator.java
new file mode 100644
index 0000000..8839e92
--- /dev/null
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/ResultEnumerator.java
@@ -0,0 +1,48 @@
+package org.apache.drill.sql.client.full;
+
+import java.util.List;
+import java.util.Map;
+
+import net.hydromatic.linq4j.Enumerator;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.DrillbitContext;
+
+/**
+   * Enumerator used for full execution engine.
+   */
+  class ResultEnumerator implements Enumerator<Object> {
+
+    private final BatchLoaderMap loaderMap;
+    private Object current;
+    
+    public ResultEnumerator(BatchListener listener, DrillbitContext context, List<String> fields) {
+      this.loaderMap = new BatchLoaderMap(fields, listener, context);
+    }
+
+    public Object current() {
+      return current;
+    }
+
+    public boolean moveNext() {
+      
+      try {
+        boolean succ = loaderMap.next();
+        if(succ){
+          current = loaderMap.getCurrentAsObjectArray();          
+        }
+        return succ;
+        
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+        throw new RuntimeException(e);
+      } catch (RpcException | SchemaChangeException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    public void reset() {
+      throw new UnsupportedOperationException();
+    }
+  }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/ref/DrillRefImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/ref/DrillRefImpl.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/ref/DrillRefImpl.java
new file mode 100644
index 0000000..651dd69
--- /dev/null
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/ref/DrillRefImpl.java
@@ -0,0 +1,240 @@
+package org.apache.drill.sql.client.ref;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import net.hydromatic.linq4j.Enumerator;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.exec.ref.IteratorRegistry;
+import org.apache.drill.exec.ref.ReferenceInterpreter;
+import org.apache.drill.exec.ref.RunOutcome;
+import org.apache.drill.exec.ref.eval.BasicEvaluatorFactory;
+import org.apache.drill.exec.ref.rse.RSERegistry;
+import org.apache.drill.jdbc.DrillTable;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class DrillRefImpl<E> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRefImpl.class);
+
+  private static final ObjectMapper mapper = createMapper();
+  
+  private final String plan;
+  final BlockingQueue<Object> queue;
+  final DrillConfig config;
+  private final List<String> fields;
+
+  
+  public DrillRefImpl(String plan, DrillConfig config, List<String> fields, BlockingQueue<Object> queue) {
+    super();
+    this.plan = plan;
+    this.config = config;
+    this.fields = fields;
+    this.queue = queue;
+  }
+  
+  
+  private static ObjectMapper createMapper() {
+    return new ObjectMapper();
+  }
+
+  /**
+   * Enumerator used for reference interpreter
+   */
+  private static class JsonEnumerator implements Enumerator {
+    private final BlockingQueue<Object> queue;
+    private final String holder;
+    private final List<String> fields;
+    private Object current;
+
+    public JsonEnumerator(BlockingQueue<Object> queue, List<String> fields) {
+      this.queue = queue;
+      this.holder = null;
+      this.fields = fields;
+    }
+
+    public Object current() {
+      return current;
+    }
+
+    public boolean moveNext() {
+      try {
+        Object o = queue.take();
+        if (o instanceof RunOutcome.OutcomeType) {
+          switch ((RunOutcome.OutcomeType) o) {
+            case SUCCESS:
+              return false; // end of data
+            case CANCELED:
+              throw new RuntimeException("canceled");
+            case FAILED:
+            default:
+              throw new RuntimeException("failed");
+          }
+        } else {
+          Object o1 = parseJson((byte[]) o);
+          if (holder != null) {
+            o1 = ((Map<String, Object>) o1).get(holder);
+          }
+          if (fields == null) {
+            current = o1;
+          } else {
+            final Map<String, Object> map = (Map<String, Object>) o1;
+            if (fields.size() == 1) {
+              current = map.get(fields.get(0));
+            } else {
+              Object[] os = new Object[fields.size()];
+              for (int i = 0; i < os.length; i++) {
+                os[i] = map.get(fields.get(i));
+              }
+              current = os;
+            }
+          }
+          return true;
+        }
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+        throw new RuntimeException(e);
+      }
+    }
+
+    public void reset() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+
+  /**
+   * Runs the plan as a background task.
+   */
+  Future<Collection<RunOutcome>> runRefInterpreterPlan(
+      CompletionService<Collection<RunOutcome>> service) {
+    LogicalPlan parsedPlan = LogicalPlan.parse(DrillConfig.create(), plan);
+    IteratorRegistry ir = new IteratorRegistry();
+    DrillConfig config = DrillConfig.create();
+    config.setSinkQueues(0, queue);
+    final ReferenceInterpreter i =
+        new ReferenceInterpreter(parsedPlan, ir, new BasicEvaluatorFactory(ir),
+            new RSERegistry(config));
+    try {
+      i.setup();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return service.submit(
+        new Callable<Collection<RunOutcome>>() {
+          @Override
+          public Collection<RunOutcome> call() throws Exception {
+            Collection<RunOutcome> outcomes = i.run();
+
+            for (RunOutcome outcome : outcomes) {
+              System.out.println("============");
+              System.out.println(outcome);
+              if (outcome.outcome == RunOutcome.OutcomeType.FAILED
+                  && outcome.exception != null) {
+                outcome.exception.printStackTrace();
+              }
+            }
+            return outcomes;
+          }
+        });
+  }
+
+  
+  
+  public Enumerator<E> enumerator(DrillTable table) {
+    // TODO: use a completion service from the container
+    final ExecutorCompletionService<Collection<RunOutcome>> service = new ExecutorCompletionService<Collection<RunOutcome>>(
+        new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(10)));
+
+    // Run the plan using an executor. It runs in a different thread, writing
+    // results to our queue.
+    //
+    // TODO: use the result of task, and check for exceptions
+    final Future<Collection<RunOutcome>> task = runRefInterpreterPlan(service);
+
+    return new JsonEnumerator(queue, fields);
+
+  }
+  
+  /**
+   * Converts a JSON document, represented as an array of bytes, into a Java
+   * object (consisting of Map, List, String, Integer, Double, Boolean).
+   */
+  static Object parseJson(byte[] bytes) {
+    try {
+      return wrapper(mapper.readTree(bytes));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+
+
+  /**
+   * Converts a JSON node to Java objects ({@link List}, {@link Map},
+   * {@link String}, {@link Integer}, {@link Double}, {@link Boolean}.
+   */
+  static Object wrapper(JsonNode node) {
+    switch (node.asToken()) {
+      case START_OBJECT:
+        return map((ObjectNode) node);
+      case START_ARRAY:
+        return array((ArrayNode) node);
+      case VALUE_STRING:
+        return node.asText();
+      case VALUE_NUMBER_INT:
+        return node.asInt();
+      case VALUE_NUMBER_FLOAT:
+        return node.asDouble();
+      case VALUE_TRUE:
+        return Boolean.TRUE;
+      case VALUE_FALSE:
+        return Boolean.FALSE;
+      case VALUE_NULL:
+        return null;
+      default:
+        throw new AssertionError("unexpected: " + node + ": " + node.asToken());
+    }
+  }
+
+  private static List<Object> array(ArrayNode node) {
+    final List<Object> list = new ArrayList<>();
+    for (JsonNode jsonNode : node) {
+      list.add(wrapper(jsonNode));
+    }
+    return Collections.unmodifiableList(list);
+  }
+
+  private static SortedMap<String, Object> map(ObjectNode node) {
+    // TreeMap makes the results deterministic.
+    final TreeMap<String, Object> map = new TreeMap<>();
+    final Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
+    while (fields.hasNext()) {
+      Map.Entry<String, JsonNode> next = fields.next();
+      map.put(next.getKey(), wrapper(next.getValue()));
+    }
+    return Collections.unmodifiableSortedMap(map);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java
index ecee65b..a30ce65 100644
--- a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java
+++ b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java
@@ -24,10 +24,13 @@ import org.apache.drill.common.util.Hook;
 import java.sql.*;
 import java.util.Properties;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * Fluent interface for writing JDBC and query-planning tests.
  */
 public class JdbcAssert {
+
   public static One withModel(String model, String schema) {
     final Properties info = new Properties();
     info.setProperty("schema", schema);
@@ -35,10 +38,12 @@ public class JdbcAssert {
     return new One(info);
   }
 
-  static String toString(ResultSet resultSet) throws SQLException {
+  static String toString(ResultSet resultSet, int expectedRecordCount) throws SQLException {
     StringBuilder buf = new StringBuilder();
+    int total = 0, n;
     while (resultSet.next()) {
-      int n = resultSet.getMetaData().getColumnCount();
+      n = resultSet.getMetaData().getColumnCount();
+      total++;
       String sep = "";
       for (int i = 1; i <= n; i++) {
         buf.append(sep)
@@ -49,9 +54,26 @@ public class JdbcAssert {
       }
       buf.append("\n");
     }
+    if (false && expectedRecordCount > 0){
+      assertEquals("Expected record count not matched.", total, expectedRecordCount);
+    }
     return buf.toString();
   }
 
+  static String toString(ResultSet resultSet) throws SQLException {
+    return toString(resultSet, -1);
+  }
+
+  static int countRecords(ResultSet resultSet) throws SQLException {
+    StringBuilder buf = new StringBuilder();
+    int total = 0, n;
+    while (resultSet.next()) {
+      n = resultSet.getMetaData().getColumnCount();
+      total += n;
+    }
+    return total;
+  }
+
   public static class One {
     private final Properties info;
     private final ConnectionFactory connectionFactory;
@@ -82,6 +104,8 @@ public class JdbcAssert {
         }
       }
     }
+
+
   }
 
   public static class Two {
@@ -113,6 +137,27 @@ public class JdbcAssert {
       }
     }
 
+    public Two displayResults(int recordCount) throws Exception {
+      Connection connection = null;
+      Statement statement = null;
+      try {
+        connection = connectionFactory.createConnection();
+        statement = connection.createStatement();
+        ResultSet resultSet = statement.executeQuery(sql);
+        // record count check is done in toString method
+        System.out.println(JdbcAssert.toString(resultSet, recordCount));
+        resultSet.close();
+        return this;
+      } finally {
+        if (statement != null) {
+          statement.close();
+        }
+        if (connection != null) {
+          connection.close();
+        }
+      }
+    }
+
     public void planContains(String expected) {
       final String[] plan0 = {null};
       Connection connection = null;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
index 2d57732..26c7a06 100644
--- a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
+++ b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
@@ -21,45 +21,85 @@ import com.google.common.base.Function;
 
 import junit.framework.TestCase;
 
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.jdbc.DrillTable;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
 
 import java.sql.*;
 
-/** Unit tests for Drill's JDBC driver. */
+/**
+ * Unit tests for Drill's JDBC driver.
+ */
 public class JdbcTest extends TestCase {
+
+  // Determine if we are in Eclipse Debug mode.
+  static final boolean IS_DEBUG = java.lang.management.ManagementFactory.getRuntimeMXBean().getInputArguments().toString().indexOf("-agentlib:jdwp") > 0;
+
+  // Set a timeout unless we're debugging.
+  @Rule public TestRule globalTimeout = IS_DEBUG ? new TestName() : new Timeout(10000);
+
+  
   private static final String MODEL =
       "{\n"
-      + "  version: '1.0',\n"
-      + "   schemas: [\n"
-      + "     {\n"
-      + "       name: 'DONUTS',\n"
-      + "       tables: [\n"
-      + "         {\n"
-      + "           name: 'DONUTS',\n"
-      + "           type: 'custom',\n"
-      + "           factory: '" + DrillTable.Factory.class.getName() + "'\n,"
-      + "           operand: {\n"
-      + "             path: '/donuts.json'\n"
-      + "           }\n"
-      + "         }\n"
-      + "       ]\n"
-      + "     }\n"
-      + "   ]\n"
-      + "}";
+          + "  version: '1.0',\n"
+          + "   schemas: [\n"
+          + "     {\n"
+          + "       name: 'DONUTS',\n"
+          + "       tables: [\n"
+          + "         {\n"
+          + "           name: 'DONUTS',\n"
+          + "           type: 'custom',\n"
+          + "           factory: '" + DrillTable.Factory.class.getName() + "'\n,"
+          + "           operand: {\n"
+          + "             path: '/donuts.json',\n"
+          + "             useReferenceInterpreter: 'true'\n"
+          + "           }\n"
+          + "         }\n"
+          + "       ]\n"
+          + "     }\n"
+          + "   ]\n"
+          + "}";
+
+  private static final String MODEL_FULL_ENGINE =
+      "{\n"
+          + "  version: '1.0',\n"
+          + "   schemas: [\n"
+          + "     {\n"
+          + "       name: 'DONUTS',\n"
+          + "       tables: [\n"
+          + "         {\n"
+          + "           name: 'DONUTS',\n"
+          + "           type: 'custom',\n"
+          + "           factory: '" + DrillTable.Factory.class.getName() + "'\n,"
+          + "           operand: {\n"
+          + "             path: '/donuts.json'\n"
+          + "           }\n"
+          + "         }\n"
+          + "       ]\n"
+          + "     }\n"
+          + "   ]\n"
+          + "}";
 
   private static final String EXPECTED =
       "_MAP={batters={batter=[{id=1001, type=Regular}, {id=1002, type=Chocolate}, {id=1003, type=Blueberry}, {id=1004, type=Devil's Food}]}, id=0001, name=Cake, ppu=0.55, sales=35, topping=[{id=5001, type=None}, {id=5002, type=Glazed}, {id=5005, type=Sugar}, {id=5007, type=Powdered Sugar}, {id=5006, type=Chocolate with Sprinkles}, {id=5003, type=Chocolate}, {id=5004, type=Maple}], type=donut}\n"
-      + "_MAP={batters={batter=[{id=1001, type=Regular}]}, id=0002, name=Raised, ppu=0.69, sales=145, topping=[{id=5001, type=None}, {id=5002, type=Glazed}, {id=5005, type=Sugar}, {id=5003, type=Chocolate}, {id=5004, type=Maple}], type=donut}\n"
-      + "_MAP={batters={batter=[{id=1001, type=Regular}, {id=1002, type=Chocolate}]}, id=0003, name=Old Fashioned, ppu=0.55, sales=300, topping=[{id=5001, type=None}, {id=5002, type=Glazed}, {id=5003, type=Chocolate}, {id=5004, type=Maple}], type=donut}\n"
-      + "_MAP={batters={batter=[{id=1001, type=Regular}, {id=1002, type=Chocolate}, {id=1003, type=Blueberry}, {id=1004, type=Devil's Food}]}, filling=[{id=6001, type=None}, {id=6002, type=Raspberry}, {id=6003, type=Lemon}, {id=6004, type=Chocolate}, {id=6005, type=Kreme}], id=0004, name=Filled, ppu=0.69, sales=14, topping=[{id=5001, type=None}, {id=5002, type=Glazed}, {id=5005, type=Sugar}, {id=5007, type=Powdered Sugar}, {id=5006, type=Chocolate with Sprinkles}, {id=5003, type=Chocolate}, {id=5004, type=Maple}], type=donut}\n"
-      + "_MAP={batters={batter=[{id=1001, type=Regular}]}, id=0005, name=Apple Fritter, ppu=1.0, sales=700, topping=[{id=5002, type=Glazed}], type=donut}\n";
+          + "_MAP={batters={batter=[{id=1001, type=Regular}]}, id=0002, name=Raised, ppu=0.69, sales=145, topping=[{id=5001, type=None}, {id=5002, type=Glazed}, {id=5005, type=Sugar}, {id=5003, type=Chocolate}, {id=5004, type=Maple}], type=donut}\n"
+          + "_MAP={batters={batter=[{id=1001, type=Regular}, {id=1002, type=Chocolate}]}, id=0003, name=Old Fashioned, ppu=0.55, sales=300, topping=[{id=5001, type=None}, {id=5002, type=Glazed}, {id=5003, type=Chocolate}, {id=5004, type=Maple}], type=donut}\n"
+          + "_MAP={batters={batter=[{id=1001, type=Regular}, {id=1002, type=Chocolate}, {id=1003, type=Blueberry}, {id=1004, type=Devil's Food}]}, filling=[{id=6001, type=None}, {id=6002, type=Raspberry}, {id=6003, type=Lemon}, {id=6004, type=Chocolate}, {id=6005, type=Kreme}], id=0004, name=Filled, ppu=0.69, sales=14, topping=[{id=5001, type=None}, {id=5002, type=Glazed}, {id=5005, type=Sugar}, {id=5007, type=Powdered Sugar}, {id=5006, type=Chocolate with Sprinkles}, {id=5003, type=Chocolate}, {id=5004, type=Maple}], type=donut}\n"
+          + "_MAP={batters={batter=[{id=1001, type=Regular}]}, id=0005, name=Apple Fritter, ppu=1.0, sales=700, topping=[{id=5002, type=Glazed}], type=donut}\n";
 
-  /** Load driver. */
+  /**
+   * Load driver.
+   */
   public void testLoadDriver() throws ClassNotFoundException {
     Class.forName("org.apache.drill.jdbc.Driver");
   }
 
-  /** Load driver and make a connection. */
+  /**
+   * Load driver and make a connection.
+   */
   public void testConnect() throws Exception {
     Class.forName("org.apache.drill.jdbc.Driver");
     final Connection connection = DriverManager.getConnection(
@@ -67,7 +107,9 @@ public class JdbcTest extends TestCase {
     connection.close();
   }
 
-  /** Load driver, make a connection, prepare a statement. */
+  /**
+   * Load driver, make a connection, prepare a statement.
+   */
   public void testPrepare() throws Exception {
     JdbcAssert.withModel(MODEL, "DONUTS")
         .withConnection(
@@ -85,14 +127,32 @@ public class JdbcTest extends TestCase {
             });
   }
 
-  /** Simple query against JSON. */
+  /**
+   * Simple query against JSON.
+   */
   public void testSelectJson() throws Exception {
     JdbcAssert.withModel(MODEL, "DONUTS")
         .sql("select * from donuts")
         .returns(EXPECTED);
   }
 
-  /** Query with project list. No field references yet. */
+  public void testFullSelectStarEngine() throws Exception {
+    JdbcAssert.withModel(MODEL_FULL_ENGINE, "DONUTS")
+        //.sql("select cast(_MAP['red'] as bigint) + 1 as red_inc from donuts ")
+        .sql("select * from donuts ")
+        .displayResults(50);
+  }
+  
+  public void testFullEngine() throws Exception {
+    JdbcAssert.withModel(MODEL_FULL_ENGINE, "DONUTS")
+        //.sql("select cast(_MAP['red'] as bigint) + 1 as red_inc from donuts ")
+        .sql("select cast(_MAP['RED'] as bigint)  as RED, cast(_MAP['GREEN'] as bigint)  as GREEN from donuts where cast(_MAP['red'] as BIGINT) > 1 ")
+        .displayResults(50);
+  }
+
+  /**
+   * Query with project list. No field references yet.
+   */
   public void testProjectConstant() throws Exception {
     JdbcAssert.withModel(MODEL, "DONUTS")
         .sql("select 1 + 3 as c from donuts")
@@ -103,7 +163,9 @@ public class JdbcTest extends TestCase {
             + "C=4\n");
   }
 
-  /** Query that projects an element from the map. */
+  /**
+   * Query that projects an element from the map.
+   */
   public void testProject() throws Exception {
     JdbcAssert.withModel(MODEL, "DONUTS")
         .sql("select _MAP['ppu'] as ppu from donuts")
@@ -114,11 +176,13 @@ public class JdbcTest extends TestCase {
             + "PPU=1.0\n");
   }
 
-  /** Same logic as {@link #testProject()}, but using a subquery. */
+  /**
+   * Same logic as {@link #testProject()}, but using a subquery.
+   */
   public void testProjectOnSubquery() throws Exception {
     JdbcAssert.withModel(MODEL, "DONUTS")
         .sql("select d['ppu'] as ppu from (\n"
-             + " select _MAP as d from donuts)")
+            + " select _MAP as d from donuts)")
         .returns("PPU=0.55\n"
             + "PPU=0.69\n"
             + "PPU=0.55\n"
@@ -126,22 +190,26 @@ public class JdbcTest extends TestCase {
             + "PPU=1.0\n");
   }
 
-  /** Checks the logical plan. */
+  /**
+   * Checks the logical plan.
+   */
   public void testProjectPlan() throws Exception {
     JdbcAssert.withModel(MODEL, "DONUTS")
         .sql("select _MAP['ppu'] as ppu from donuts")
         .planContains(
             "{'head':{'type':'APACHE_DRILL_LOGICAL','version':'1','generator':{'type':'manual','info':'na'}},"
-            + "'storage':{'donuts-json':{'type':'classpath'},'queue':{'type':'queue'}},"
-            + "'query':["
-            + "{'op':'sequence','do':["
-            + "{'op':'scan','memo':'initial_scan','ref':'_MAP','storageengine':'donuts-json','selection':{'path':'/donuts.json','type':'JSON'}},"
-            + "{'op':'project','projections':[{'expr':'_MAP.ppu','ref':'output.PPU'}]},"
-            + "{'op':'store','storageengine':'queue','memo':'output sink','target':{'number':0}}]}]}");
+                + "'storage':{'donuts-json':{'type':'classpath'},'queue':{'type':'queue'}},"
+                + "'query':["
+                + "{'op':'sequence','do':["
+                + "{'op':'scan','memo':'initial_scan','ref':'_MAP','storageengine':'donuts-json','selection':{'path':'/donuts.json','type':'JSON'}},"
+                + "{'op':'project','projections':[{'expr':'_MAP.ppu','ref':'output.PPU'}]},"
+                + "{'op':'store','storageengine':'queue','memo':'output sink','target':{'number':0}}]}]}");
   }
 
-  /** Query with subquery, filter, and projection of one real and one
-   * nonexistent field from a map field. */
+  /**
+   * Query with subquery, filter, and projection of one real and one
+   * nonexistent field from a map field.
+   */
   public void testProjectFilterSubquery() throws Exception {
     JdbcAssert.withModel(MODEL, "DONUTS")
         .sql("select d['name'] as name, d['xx'] as xx from (\n"
@@ -159,16 +227,18 @@ public class JdbcTest extends TestCase {
             + "where cast(d['ppu'] as double) > 0.6")
         .planContains(
             "{'head':{'type':'APACHE_DRILL_LOGICAL','version':'1','generator':{'type':'manual','info':'na'}},'storage':{'donuts-json':{'type':'classpath'},'queue':{'type':'queue'}},"
-            + "'query':["
-            + "{'op':'sequence','do':["
-            + "{'op':'scan','memo':'initial_scan','ref':'_MAP','storageengine':'donuts-json','selection':{'path':'/donuts.json','type':'JSON'}},"
-            + "{'op':'filter','expr':'(_MAP.donuts.ppu > 0.6)'},"
-            + "{'op':'project','projections':[{'expr':'_MAP.donuts','ref':'output.D'}]},"
-            + "{'op':'project','projections':[{'expr':'D.name','ref':'output.NAME'},{'expr':'D.xx','ref':'output.XX'}]},"
-            + "{'op':'store','storageengine':'queue','memo':'output sink','target':{'number':0}}]}]}");
+                + "'query':["
+                + "{'op':'sequence','do':["
+                + "{'op':'scan','memo':'initial_scan','ref':'_MAP','storageengine':'donuts-json','selection':{'path':'/donuts.json','type':'JSON'}},"
+                + "{'op':'filter','expr':'(_MAP.donuts.ppu > 0.6)'},"
+                + "{'op':'project','projections':[{'expr':'_MAP.donuts','ref':'output.D'}]},"
+                + "{'op':'project','projections':[{'expr':'D.name','ref':'output.NAME'},{'expr':'D.xx','ref':'output.XX'}]},"
+                + "{'op':'store','storageengine':'queue','memo':'output sink','target':{'number':0}}]}]}");
   }
 
-  /** Query that projects one field. (Disabled; uses sugared syntax.) */
+  /**
+   * Query that projects one field. (Disabled; uses sugared syntax.)
+   */
   public void _testProjectNestedFieldSugared() throws Exception {
     JdbcAssert.withModel(MODEL, "DONUTS")
         .sql("select donuts.ppu from donuts")
@@ -179,7 +249,9 @@ public class JdbcTest extends TestCase {
             + "C=4\n");
   }
 
-  /** Query with filter. No field references yet. */
+  /**
+   * Query with filter. No field references yet.
+   */
   public void testFilterConstantFalse() throws Exception {
     JdbcAssert.withModel(MODEL, "DONUTS")
         .sql("select * from donuts where 3 > 4")