You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/06 23:46:30 UTC

[1/3] Sort operator using HyperBatch concept. Supports single and multikey asc/desc sort. One comparator function definition. Add abstract record batch, vector container and vector wrapper. Various fixes to return to client when query fails.

Updated Branches:
  refs/heads/master 31f196497 -> db3afaa85


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
index ec17c63..38a4236 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
@@ -3,6 +3,7 @@ package org.apache.drill.exec.expr;
 import static org.junit.Assert.assertEquals;
 import mockit.Expectations;
 import mockit.Injectable;
+import mockit.NonStrict;
 import mockit.NonStrictExpectations;
 
 import org.antlr.runtime.ANTLRStringStream;
@@ -23,8 +24,10 @@ import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.physical.impl.project.Projector;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.ValueVector;
 import org.junit.AfterClass;
 import org.junit.Test;
 
@@ -37,15 +40,18 @@ public class ExpressionTest {
   }
 
   @Test
-  public void testSpecial(final @Injectable RecordBatch batch) throws Exception {
-    final TypedFieldId tfid = new TypedFieldId(Types.optional(MinorType.INT),0);
-
+  public void testSpecial(final @Injectable RecordBatch batch, @Injectable ValueVector vector) throws Exception {
+    final TypedFieldId tfid = new TypedFieldId(Types.optional(MinorType.INT),0, false);
+    
     new NonStrictExpectations() {
+      @NonStrict VectorWrapper<?> wrapper;
       {
         batch.getValueVectorId(new SchemaPath("alpha", ExpressionPosition.UNKNOWN));
         result = tfid;
-        batch.getValueVectorById(tfid.getFieldId(), IntVector.class);
-        result = new IntVector(null, null);
+        batch.getValueAccessorById(tfid.getFieldId(), IntVector.class);
+        result = wrapper;
+        wrapper.getValueVector(); 
+        result = new IntVector(null, null); 
       }
 
     };
@@ -54,7 +60,7 @@ public class ExpressionTest {
 
   @Test
   public void testSchemaExpression(final @Injectable RecordBatch batch) throws Exception {
-    final TypedFieldId tfid = new TypedFieldId(Types.optional(MinorType.BIGINT), 0);
+    final TypedFieldId tfid = new TypedFieldId(Types.optional(MinorType.BIGINT), 0, false);
 
     new Expectations() {
       {
@@ -89,7 +95,7 @@ public class ExpressionTest {
     }
 
     CodeGenerator<Projector> cg = new CodeGenerator<Projector>(Projector.TEMPLATE_DEFINITION, new FunctionImplementationRegistry(DrillConfig.create()));
-    cg.addExpr(new ValueVectorWriteExpression(-1, materializedExpr));
+    cg.addExpr(new ValueVectorWriteExpression(new TypedFieldId(materializedExpr.getMajorType(), -1), materializedExpr));
     return cg.generate();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index 5e7c599..dddd322 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -1,15 +1,19 @@
 package org.apache.drill.exec.physical.impl;
 
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.impl.ScreenCreator.ScreenRoot;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
-import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.vector.ValueVector;
 
+import com.beust.jcommander.internal.Lists;
+
 public class SimpleRootExec implements RootExec, Iterable<ValueVector>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleRootExec.class);
 
@@ -28,9 +32,10 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector>{
     return incoming.getSelectionVector2();
   }
 
+  @SuppressWarnings("unchecked")
   public <T extends ValueVector> T getValueVectorById(SchemaPath path, Class<?> vvClass){
     TypedFieldId tfid = incoming.getValueVectorId(path);
-    return incoming.getValueVectorById(tfid.getFieldId(), vvClass);
+    return (T) incoming.getValueAccessorById(tfid.getFieldId(), vvClass).getValueVector();
   }
   
   @Override
@@ -40,11 +45,16 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector>{
 
   @Override
   public void stop() {
+    incoming.kill();
   }
 
   @Override
   public Iterator<ValueVector> iterator() {
-    return incoming.iterator();
+    List<ValueVector> vv = Lists.newArrayList();
+    for(VectorWrapper<?> vw : incoming){
+      vv.add(vw.getValueVector());
+    }
+    return vv.iterator();
   }
 
   public int getRecordCount(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
index 7b002ea..e21289c 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
@@ -17,7 +17,7 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.impl;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 import java.util.List;
 
@@ -26,6 +26,7 @@ import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.pop.PopUnitTestBase;
 import org.apache.drill.exec.proto.UserProtos.QueryType;
 import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.RemoteServiceSet;
@@ -53,14 +54,14 @@ public class TestSimpleFragmentRun extends PopUnitTestBase {
     RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
     int recordCount = 0;
     for (QueryResultBatch batch : results) {
-      if(!batch.hasData()) continue;
+
       boolean schemaChanged = batchLoader.load(batch.getHeader().getDef(), batch.getData());
       boolean firstColumn = true;
 
       // print headers.
       if (schemaChanged) {
         System.out.println("\n\n========NEW SCHEMA=========\n\n");
-        for (ValueVector value : batchLoader) {
+        for (VectorWrapper<?> value : batchLoader) {
 
           if (firstColumn) {
             firstColumn = false;
@@ -79,13 +80,13 @@ public class TestSimpleFragmentRun extends PopUnitTestBase {
       for (int i = 0; i < batchLoader.getRecordCount(); i++) {
         boolean first = true;
         recordCount++;
-        for (ValueVector value : batchLoader) {
+        for (VectorWrapper<?> value : batchLoader) {
           if (first) {
             first = false;
           } else {
             System.out.print("\t");
           }
-          System.out.print(value.getAccessor().getObject(i));
+          System.out.print(value.getValueVector().getAccessor().getObject(i));
         }
         if(!first) System.out.println();
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
new file mode 100644
index 0000000..af5ec06
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
@@ -0,0 +1,145 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import static org.junit.Assert.assertTrue;
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.SimpleRootExec;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.IntVector;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.yammer.metrics.MetricRegistry;
+
+public class TestSimpleSort {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSimpleSort.class);
+  DrillConfig c = DrillConfig.create();
+  
+  
+  @Test
+  public void sortOneKeyAscending(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Throwable{
+
+
+    new NonStrictExpectations(){{
+      bitContext.getMetrics(); result = new MetricRegistry("test");
+      bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+    }};
+    
+    
+    PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+    PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/sort/one_key_sort.json"), Charsets.UTF_8));
+    FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+    SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+    
+    int previousInt = Integer.MIN_VALUE;
+
+    int recordCount = 0;
+    int batchCount = 0;
+
+    while(exec.next()){
+      batchCount++;
+      IntVector c1 = exec.getValueVectorById(new SchemaPath("blue", ExpressionPosition.UNKNOWN), IntVector.class);
+      IntVector c2 = exec.getValueVectorById(new SchemaPath("green", ExpressionPosition.UNKNOWN), IntVector.class);
+      
+      IntVector.Accessor a1 = c1.getAccessor();
+      IntVector.Accessor a2 = c2.getAccessor();
+      
+      for(int i =0; i < c1.getAccessor().getValueCount(); i++){
+        recordCount++;
+        assert previousInt <= a1.get(i);
+        previousInt = a1.get(i);
+        assert previousInt == a2.get(i);
+      }
+     
+      
+    }
+    
+    System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount));
+
+    if(context.getFailureCause() != null){
+      throw context.getFailureCause();
+    }
+    assertTrue(!context.isFailed());
+  }
+  
+  @Test
+  public void sortTwoKeysOneAscendingOneDescending(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Throwable{
+
+
+    new NonStrictExpectations(){{
+      bitContext.getMetrics(); result = new MetricRegistry("test");
+      bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+    }};
+    
+    
+    PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+    PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/sort/two_key_sort.json"), Charsets.UTF_8));
+    FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+    SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+    
+    int previousInt = Integer.MIN_VALUE;
+    long previousLong = Long.MAX_VALUE;
+    
+    int recordCount = 0;
+    int batchCount = 0;
+
+    while(exec.next()){
+      batchCount++;
+      IntVector c1 = exec.getValueVectorById(new SchemaPath("blue", ExpressionPosition.UNKNOWN), IntVector.class);
+      BigIntVector c2 = exec.getValueVectorById(new SchemaPath("alt", ExpressionPosition.UNKNOWN), BigIntVector.class);
+      
+      IntVector.Accessor a1 = c1.getAccessor();
+      BigIntVector.Accessor a2 = c2.getAccessor();
+      
+      for(int i =0; i < c1.getAccessor().getValueCount(); i++){
+        recordCount++;
+        assert previousInt <= a1.get(i);
+        
+        if(previousInt != a1.get(i)){
+          previousLong = Long.MAX_VALUE;
+          previousInt = a1.get(i);
+        }
+        
+        assert previousLong >= a2.get(i);
+        
+        //System.out.println(previousInt + "\t" + a2.get(i));
+        
+      }
+     
+      
+    }
+    
+    System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount));
+
+    if(context.getFailureCause() != null){
+      throw context.getFailureCause();
+    }
+    assertTrue(!context.isFailed());
+  }
+  
+  @AfterClass
+  public static void tearDown() throws Exception{
+    // pause to get logger to catch up.
+    Thread.sleep(1000);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
index dae5f78..8a1736c 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
@@ -28,7 +28,6 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
 import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
-import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/test/resources/sort/one_key_sort.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/sort/one_key_sort.json b/sandbox/prototype/exec/java-exec/src/test/resources/sort/one_key_sort.json
new file mode 100644
index 0000000..3bd0b71
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/sort/one_key_sort.json
@@ -0,0 +1,44 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+	graph:[
+        {
+            @id:1,
+            pop:"mock-scan",
+            url: "http://apache.org",
+            entries:[
+            	{records: 1000000, types: [
+            	  {name: "blue", type: "INT", mode: "REQUIRED"},
+            	  {name: "green", type: "INT", mode: "REQUIRED"}
+            	]},
+              {records: 1000000, types: [
+                {name: "blue", type: "INT", mode: "REQUIRED"},
+                {name: "green", type: "INT", mode: "REQUIRED"}
+              ]}
+            ]
+        },
+        {
+            @id:2,
+            child: 1,
+            pop:"sort",
+            orderings: [
+              {expr: "blue"}
+            ]
+        },
+        {
+            @id:3,
+            child: 2,
+            pop:"selection-vector-remover"
+        },
+        {
+            @id: 4,
+            child: 3,
+            pop: "screen"
+        }
+    ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/test/resources/sort/two_key_sort.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/sort/two_key_sort.json b/sandbox/prototype/exec/java-exec/src/test/resources/sort/two_key_sort.json
new file mode 100644
index 0000000..2394626
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/sort/two_key_sort.json
@@ -0,0 +1,54 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+	graph:[
+        {
+            @id:1,
+            pop:"mock-scan",
+            url: "http://apache.org",
+            entries:[
+            	{records: 100, types: [
+            	  {name: "blue", type: "INT", mode: "REQUIRED"},
+            	  {name: "green", type: "INT", mode: "REQUIRED"}
+            	]},
+              {records: 100, types: [
+                {name: "blue", type: "INT", mode: "REQUIRED"},
+                {name: "green", type: "INT", mode: "REQUIRED"}
+              ]}
+            ]
+        },
+        {
+            @id:2,
+            child: 1,
+            pop:"project",
+            exprs: [
+              { ref: "blue", expr:"blue" },
+              { ref: "alt", expr:"alternate3()" }
+            ]
+        },
+        {
+            @id:3,
+            child: 2,
+            pop:"sort",
+            orderings: [
+              {expr: "blue"},
+              {expr: "alt", order: "DESC"}
+            ]
+        },
+        {
+            @id:4,
+            child: 3,
+            pop:"selection-vector-remover"
+        },
+        {
+            @id: 5,
+            child: 4,
+            pop: "screen"
+        }
+    ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java
index b3a7e35..2209b45 100644
--- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java
@@ -2,6 +2,7 @@ package org.apache.drill.sql.client.full;
 
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
@@ -11,7 +12,7 @@ public class BatchListener implements UserResultsListener {
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchListener.class);
 
-  private RpcException ex;
+  private volatile RpcException ex;
   private volatile boolean completed = false;
   
   final BlockingQueue<QueryResultBatch> queue = new ArrayBlockingQueue<>(100);
@@ -36,11 +37,15 @@ public class BatchListener implements UserResultsListener {
   }
 
   public QueryResultBatch getNext() throws RpcException, InterruptedException{
-    if(ex != null) throw ex;
-    if(completed && queue.isEmpty()){
-      return null;
-    }else{
-      return queue.take();
+    while(true){
+      if(ex != null) throw ex;
+      if(completed && queue.isEmpty()){
+        return null;
+      }else{
+        QueryResultBatch q = queue.poll(50, TimeUnit.MILLISECONDS);
+        if(q != null) return q;
+      }
+      
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java
index 4eb243d..a60f92b 100644
--- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java
@@ -10,6 +10,7 @@ import jline.internal.Preconditions;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.server.DrillbitContext;
@@ -46,8 +47,8 @@ public class BatchLoaderMap implements Map<String, Object> {
     boolean schemaChanged = loader.load(batch.getHeader().getDef(), batch.getData());
     if (schemaChanged) {
       fields.clear();
-      for (ValueVector v : loader) {
-        fields.put(v.getField().getName(), v);
+      for (VectorWrapper<?> v : loader) {
+        fields.put(v.getField().getName(), v.getValueVector());
       }
     } else {
       logger.debug("Schema didn't change. {}", batch);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java
index 281871e..79b8ef8 100644
--- a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java
+++ b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java
@@ -3,7 +3,11 @@ package org.apache.drill.jdbc.test;
 import java.io.IOException;
 
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Resources;
@@ -13,6 +17,12 @@ public class FullEngineTest {
 
   private static String MODEL_FULL_ENGINE;
 
+  // Determine if we are in Eclipse Debug mode.
+  static final boolean IS_DEBUG = java.lang.management.ManagementFactory.getRuntimeMXBean().getInputArguments().toString().indexOf("-agentlib:jdwp") > 0;
+
+  // Set a timeout unless we're debugging.
+  @Rule public TestRule globalTimeout = IS_DEBUG ? new TestName() : new Timeout(10000);
+  
   @BeforeClass
   public static void setupFixtures() throws IOException {
     MODEL_FULL_ENGINE = Resources.toString(Resources.getResource("full-model.json"), Charsets.UTF_8);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
index 1900dc3..3bde4ed 100644
--- a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
+++ b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
@@ -25,7 +25,11 @@ import java.sql.Statement;
 import org.apache.drill.exec.ref.ReferenceInterpreter;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Function;
@@ -36,6 +40,7 @@ public class JdbcTest {
   private static String MODEL;
   private static String EXPECTED;
 
+
   @BeforeClass
   public static void setupFixtures() throws IOException {
     MODEL = Resources.toString(Resources.getResource("test-models.json"), Charsets.UTF_8);


[2/3] Sort operator using HyperBatch concept. Supports single and multikey asc/desc sort. One comparator function definition. Add abstract record batch, vector container and vector wrapper. Various fixes to return to client when query fails.

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
new file mode 100644
index 0000000..daf96fc
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -0,0 +1,130 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.List;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.BufferAllocator.PreAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.google.common.collect.ArrayListMultimap;
+
+public class SortRecordBatchBuilder {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortRecordBatchBuilder.class);
+  
+  private final ArrayListMultimap<BatchSchema, RecordBatchData> batches = ArrayListMultimap.create();
+  private final VectorContainer container;
+
+  private int recordCount;
+  private long runningBytes;
+  private long runningBatches;
+  private final long maxBytes;
+  private SelectionVector4 sv4;
+  final PreAllocator svAllocator;
+  
+  public SortRecordBatchBuilder(BufferAllocator a, long maxBytes, VectorContainer container){
+    this.maxBytes = maxBytes;
+    this.svAllocator = a.getPreAllocator();
+    this.container = container;
+  }
+  
+  private long getSize(RecordBatch batch){
+    long bytes = 0;
+    for(VectorWrapper<?> v : batch){
+      bytes += v.getValueVector().getBufferSize();
+    }
+    return bytes;
+  }
+  
+  /**
+   * Add another record batch to the set of record batches.  
+   * @param batch
+   * @return True if the requested add completed successfully.  Returns false in the case that this builder is full and cannot receive additional packages. 
+   * @throws SchemaChangeException
+   */
+  public boolean add(RecordBatch batch){
+    if(batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch.");
+    if (batch.getRecordCount() == 0) return true; // skip over empty record batches.
+
+    long batchBytes = getSize(batch);
+    if(batchBytes + runningBytes > maxBytes) return false; // enough data memory.
+    if(runningBatches+1 > Character.MAX_VALUE) return false; // allowed in batch.
+    if(!svAllocator.preAllocate(batch.getRecordCount()*4)) return false;  // sv allocation available.
+      
+   
+    RecordBatchData bd = new RecordBatchData(batch);
+    runningBytes += batchBytes;
+    batches.put(batch.getSchema(), bd);
+    recordCount += bd.getRecordCount();
+    return true;
+  }
+
+  public void build(FragmentContext context) throws SchemaChangeException{
+    container.clear();
+    if(batches.keySet().size() > 1) throw new SchemaChangeException("Sort currently only supports a single schema.");
+    if(batches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Sort cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
+    sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE);
+    BatchSchema schema = batches.keySet().iterator().next();
+    List<RecordBatchData> data = batches.get(schema);
+    
+    // now we're going to generate the sv4 pointers
+    switch(schema.getSelectionVectorMode()){
+    case NONE: {
+      int index = 0;
+      int recordBatchId = 0;
+      for(RecordBatchData d : data){
+        for(int i =0; i < d.getRecordCount(); i++, index++){
+          sv4.set(index, recordBatchId, i);
+        }
+        recordBatchId++;
+      }
+      break;
+    }
+    case TWO_BYTE: {
+      int index = 0;
+      int recordBatchId = 0;
+      for(RecordBatchData d : data){
+        for(int i =0; i < d.getRecordCount(); i++, index++){
+          sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i));
+        }
+        // might as well drop the selection vector since we'll stop using it now.
+        d.getSv2().clear();
+        recordBatchId++;
+      }
+      break;
+    }
+    default:
+      throw new UnsupportedOperationException();
+    }
+    
+    // next, we'll create lists of each of the vector types.
+    ArrayListMultimap<MaterializedField, ValueVector> vectors = ArrayListMultimap.create();
+    for(RecordBatchData rbd : batches.values()){
+      for(ValueVector v : rbd.vectors){
+        vectors.put(v.getField(), v);
+      }
+    }
+    
+    for(MaterializedField f : vectors.keySet()){
+      List<ValueVector> v = vectors.get(f);
+      container.addHyperList(v);
+    }
+    
+    container.buildSchema(SelectionVectorMode.FOUR_BYTE);
+  }
+
+  public SelectionVector4 getSv4() {
+    return sv4;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
new file mode 100644
index 0000000..c45f500
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
@@ -0,0 +1,45 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.QuickSort;
+
+public abstract class SortTemplate implements Sorter, IndexedSortable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortTemplate.class);
+  
+  private SelectionVector4 vector4;
+  
+  
+  public void setup(FragmentContext context, RecordBatch hyperBatch) throws SchemaChangeException{
+    // we pass in the local hyperBatch since that is where we'll be reading data.
+    vector4 = hyperBatch.getSelectionVector4();
+    doSetup(context, hyperBatch, null);
+  }
+  
+  @Override
+  public void sort(SelectionVector4 vector4, VectorContainer container){
+    QuickSort qs = new QuickSort();
+    qs.sort(this, 0, vector4.getTotalCount());
+  }
+
+  @Override
+  public void swap(int sv0, int sv1) {
+    int tmp = vector4.get(sv0);
+    vector4.set(sv0, vector4.get(sv1));
+    vector4.set(sv1, tmp);
+  }
+  
+  @Override
+  public int compare(int inIndex, int outIndex) {
+    int sv1 = vector4.get(inIndex);
+    int sv2 = vector4.get(outIndex);
+    return doEval(sv1, sv2);
+  }
+
+  public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+  public abstract int doEval(int inIndex, int outIndex);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
new file mode 100644
index 0000000..bc4fae5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
@@ -0,0 +1,17 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public interface Sorter {
+  public void setup(FragmentContext context, RecordBatch hyperBatch) throws SchemaChangeException;
+  public void sort(SelectionVector4 vector4, VectorContainer container);
+  
+  public static TemplateClassDefinition<Sorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<Sorter>( //
+      Sorter.class, "org.apache.drill.exec.physical.impl.sort.SortTemplate", Comparator.class, int.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
index 55d6ba2..ce17a2b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
@@ -8,8 +8,11 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 
 public interface Copier {
-  public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION = new TemplateClassDefinition<Copier>( //
-      Copier.class, "org.apache.drill.exec.physical.impl.svremover.CopierTemplate", CopyEvaluator.class, null);
+  public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Copier>( //
+      Copier.class, "org.apache.drill.exec.physical.impl.svremover.CopierTemplate2", CopyEvaluator.class, null);
+
+  public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Copier>( //
+      Copier.class, "org.apache.drill.exec.physical.impl.svremover.CopierTemplate4", CopyEvaluator.class, null);
 
   public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException;
   public abstract void copyRecords();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java
deleted file mode 100644
index 6a0e2c3..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.drill.exec.physical.impl.svremover;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.VectorAllocator;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-
-public abstract class CopierTemplate implements Copier{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate.class);
-  
-  private SelectionVector2 sv2;
-  private VectorAllocator[] allocators;
-  
-  @Override
-  public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException{
-    this.allocators = allocators;
-    this.sv2 = incoming.getSelectionVector2();
-    doSetup(context, incoming, outgoing);
-  }
-  
-  private void allocateVectors(int recordCount){
-    for(VectorAllocator a : allocators){
-      a.alloc(recordCount);
-    }
-  }
-  
-  @Override
-  public void copyRecords(){
-    final int recordCount = sv2.getCount();
-    allocateVectors(recordCount);
-    int outgoingPosition = 0;
-    
-    for(int svIndex = 0; svIndex < recordCount; svIndex++, outgoingPosition++){
-      doEval(svIndex, outgoingPosition);
-    }
-  }
-  
-  public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
-  public abstract void doEval(int incoming, int outgoing);
-        
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
new file mode 100644
index 0000000..4dc38f2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
@@ -0,0 +1,43 @@
+package org.apache.drill.exec.physical.impl.svremover;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.VectorAllocator;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+
+public abstract class CopierTemplate2 implements Copier{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate2.class);
+  
+  private SelectionVector2 sv2;
+  private VectorAllocator[] allocators;
+  
+  @Override
+  public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException{
+    this.allocators = allocators;
+    this.sv2 = incoming.getSelectionVector2();
+    doSetup(context, incoming, outgoing);
+  }
+  
+  private void allocateVectors(int recordCount){
+    for(VectorAllocator a : allocators){
+      a.alloc(recordCount);
+    }
+  }
+  
+  @Override
+  public void copyRecords(){
+    final int recordCount = sv2.getCount();
+    allocateVectors(recordCount);
+    int outgoingPosition = 0;
+    
+    for(int svIndex = 0; svIndex < recordCount; svIndex++, outgoingPosition++){
+      doEval(svIndex, outgoingPosition);
+    }
+  }
+  
+  public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+  public abstract void doEval(int incoming, int outgoing);
+        
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
new file mode 100644
index 0000000..2cf033e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
@@ -0,0 +1,47 @@
+package org.apache.drill.exec.physical.impl.svremover;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.VectorAllocator;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public abstract class CopierTemplate4 implements Copier{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate4.class);
+  
+  private SelectionVector4 sv4;
+  private VectorAllocator[] allocators;
+
+  
+  private void allocateVectors(int recordCount){
+    for(VectorAllocator a : allocators){
+      a.alloc(recordCount);
+    }
+  }
+  
+  @Override
+  public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException{
+    this.allocators = allocators;
+    this.sv4 = incoming.getSelectionVector4();
+    doSetup(context, incoming, outgoing);
+  }
+  
+
+  @Override
+  public void copyRecords(){
+    final int recordCount = sv4.getLength();
+    allocateVectors(recordCount);
+    int outgoingPosition = 0;
+    final int end = sv4.getStart() + sv4.getLength();
+    for(int svIndex = sv4.getStart(); svIndex < end; svIndex++, outgoingPosition++){
+      int deRefIndex = sv4.get(svIndex);
+      doEval(deRefIndex, outgoingPosition);
+    }
+  }
+  
+  public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+  public abstract void doEval(int incoming, int outgoing);
+        
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 68793b0..64e89ee 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -1,24 +1,22 @@
 package org.apache.drill.exec.physical.impl.svremover;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.List;
 
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.VectorHolder;
-import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.physical.config.SelectionVectorRemover;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.SchemaBuilder;
 import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.TypeHelper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 
@@ -31,36 +29,14 @@ import com.sun.codemodel.JExpression;
 import com.sun.codemodel.JType;
 import com.sun.codemodel.JVar;
 
-public class RemovingRecordBatch implements RecordBatch{
+public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVectorRemover>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemovingRecordBatch.class);
 
-  private final RecordBatch incoming;
-  private final FragmentContext context;
-  private BatchSchema outSchema;
   private Copier copier;
-  private List<ValueVector> outputVectors;
-  private VectorHolder vh;
   private int recordCount;
   
-  public RemovingRecordBatch(RecordBatch incoming, FragmentContext context){
-    this.incoming = incoming;
-    this.context = context;
-  }
-  
-  @Override
-  public Iterator<ValueVector> iterator() {
-    return outputVectors.iterator();
-  }
-
-  @Override
-  public FragmentContext getContext() {
-    return context;
-  }
-
-  @Override
-  public BatchSchema getSchema() {
-    Preconditions.checkNotNull(outSchema);
-    return outSchema;
+  public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext context, RecordBatch incoming) {
+    super(popConfig, context, incoming);
   }
 
   @Override
@@ -69,64 +45,36 @@ public class RemovingRecordBatch implements RecordBatch{
   }
 
   @Override
-  public void kill() {
-    incoming.kill();
-  }
-
-  @Override
-  public SelectionVector2 getSelectionVector2() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public SelectionVector4 getSelectionVector4() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public TypedFieldId getValueVectorId(SchemaPath path) {
-    return vh.getValueVector(path);
-  }
+  protected void setupNewSchema() throws SchemaChangeException {
+    container.clear();
+    
+    switch(incoming.getSchema().getSelectionVectorMode()){
+    case NONE:
+      this.copier = getStraightCopier();
+      break;
+    case TWO_BYTE:
+      this.copier = getGenerated2Copier();
+      break;
+    case FOUR_BYTE:
+      this.copier = getGenerated4Copier();
+      break;
+    default:
+      throw new UnsupportedOperationException();
+    }
+    
+    container.buildSchema(SelectionVectorMode.NONE);
 
-  @Override
-  public <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> clazz) {
-    return vh.getValueVector(fieldId, clazz);
   }
 
   @Override
-  public IterOutcome next() {
-    recordCount = 0;
-    IterOutcome upstream = incoming.next();
-    logger.debug("Upstream... {}", upstream);
-    switch(upstream){
-    case NONE:
-    case NOT_YET:
-    case STOP:
-      return upstream;
-    case OK_NEW_SCHEMA:
-      try{
-        copier = createCopier();
-      }catch(SchemaChangeException ex){
-        incoming.kill();
-        logger.error("Failure during query", ex);
-        context.fail(ex);
-        return IterOutcome.STOP;
-      }
-      // fall through.
-    case OK:
-      recordCount = incoming.getRecordCount();
-      copier.copyRecords();
-      for(ValueVector v : this.outputVectors){
-        ValueVector.Mutator m = v.getMutator();
-        m.setValueCount(recordCount);
-      }
-      return upstream; // change if upstream changed, otherwise normal.
-    default:
-      throw new UnsupportedOperationException();
+  protected void doWork() {
+    recordCount = incoming.getRecordCount();
+    copier.copyRecords();
+    for(VectorWrapper<?> v : container){
+      ValueVector.Mutator m = v.getValueVector().getMutator();
+      m.setValueCount(recordCount);
     }
   }
-  
-  
 
   
   private class StraightCopier implements Copier{
@@ -136,8 +84,8 @@ public class RemovingRecordBatch implements RecordBatch{
     
     @Override
     public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators){
-      for(ValueVector vv : incoming){
-        TransferPair tp = vv.getTransferPair();
+      for(VectorWrapper<?> vv : incoming){
+        TransferPair tp = vv.getValueVector().getTransferPair();
         pairs.add(tp);
         out.add(tp.getTo());
       }
@@ -159,23 +107,23 @@ public class RemovingRecordBatch implements RecordBatch{
   private Copier getStraightCopier(){
     StraightCopier copier = new StraightCopier();
     copier.setupRemover(context, incoming, this, null);
-    outputVectors.addAll(copier.getOut());
+    container.addCollection(copier.getOut());
     return copier;
   }
   
-  private Copier getGeneratedCopier() throws SchemaChangeException{
+  private Copier getGenerated2Copier() throws SchemaChangeException{
     Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
     
     List<VectorAllocator> allocators = Lists.newArrayList();
-    for(ValueVector i : incoming){
-      TransferPair t = i.getTransferPair();
-      outputVectors.add(t.getTo());
-      allocators.add(getAllocator(i, t.getTo()));
+    for(VectorWrapper<?> i : incoming){
+      ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
+      container.add(v);
+      allocators.add(getAllocator(i.getValueVector(), v));
     }
 
     try {
-      final CodeGenerator<Copier> cg = new CodeGenerator<Copier>(Copier.TEMPLATE_DEFINITION, context.getFunctionRegistry());
-      generateCopies(cg);
+      final CodeGenerator<Copier> cg = new CodeGenerator<Copier>(Copier.TEMPLATE_DEFINITION2, context.getFunctionRegistry());
+      generateCopies(cg, false);
       Copier copier = context.getImplementationClass(cg);
       copier.setupRemover(context, incoming, this, allocators.toArray(new VectorAllocator[allocators.size()]));
       return copier;
@@ -184,73 +132,77 @@ public class RemovingRecordBatch implements RecordBatch{
     }
   }
   
-  
-  private Copier createCopier() throws SchemaChangeException{
-    if(outputVectors != null){
-      for(ValueVector v : outputVectors){
-        v.close();
-      }
-    }
-    this.outputVectors = Lists.newArrayList();
-    this.vh = new VectorHolder(outputVectors);
-
-    SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(SelectionVectorMode.NONE);
-    for(ValueVector v : incoming){
-      bldr.addField(v.getField());
-    }
-    this.outSchema = bldr.build();
+  private Copier getGenerated4Copier() throws SchemaChangeException{
+    Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE);
     
-    switch(incoming.getSchema().getSelectionVectorMode()){
-    case NONE:
-      return getStraightCopier();
-    case TWO_BYTE:
-      return getGeneratedCopier();
-    default:
-      throw new UnsupportedOperationException();
+    List<VectorAllocator> allocators = Lists.newArrayList();
+    for(VectorWrapper<?> i : incoming){
+      
+      ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
+      container.add(v);
+      allocators.add(getAllocator4(v));
     }
 
+    try {
+      final CodeGenerator<Copier> cg = new CodeGenerator<Copier>(Copier.TEMPLATE_DEFINITION4, context.getFunctionRegistry());
+      generateCopies(cg, true);
+      Copier copier = context.getImplementationClass(cg);
+      copier.setupRemover(context, incoming, this, allocators.toArray(new VectorAllocator[allocators.size()]));
+      return copier;
+    } catch (ClassTransformationException | IOException e) {
+      throw new SchemaChangeException("Failure while attempting to load generated class", e);
+    }
   }
   
-  private void generateCopies(CodeGenerator<Copier> g){
+  private void generateCopies(CodeGenerator<Copier> g, boolean hyper){
     // we have parallel ids for each value vector so we don't actually have to deal with managing the ids at all.
     int fieldId = 0;
     
-
-
     JExpression inIndex = JExpr.direct("inIndex");
     JExpression outIndex = JExpr.direct("outIndex");
     g.rotateBlock();
-    for(ValueVector vv : incoming){
-      JClass vvClass = (JClass) g.getModel()._ref(vv.getClass());
-      JVar inVV = declareVVSetup("incoming", g, fieldId, vvClass);
-      JVar outVV = declareVVSetup("outgoing", g, fieldId, vvClass);
+    for(VectorWrapper<?> vv : incoming){
+      JVar inVV = g.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vv.getField().getType(), fieldId, vv.isHyper()));
+      JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), fieldId, false));
+
+      if(hyper){
+        
+        g.getBlock().add( 
+            outVV
+            .invoke("copyFrom")
+            .arg(
+                inIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
+            .arg(outIndex)
+            .arg(
+                inVV.component(inIndex.shrz(JExpr.lit(16)))
+                )
+            );  
+      }else{
+        g.getBlock().add(outVV.invoke("copyFrom").arg(inIndex).arg(outIndex).arg(inVV));
+      }
       
-      g.getBlock().add(outVV.invoke("copyFrom").arg(inIndex).arg(outIndex).arg(inVV));
       
       fieldId++;
     }
   }
   
-  private JVar declareVVSetup(String varName, CodeGenerator<?> g, int fieldId, JClass vvClass){
-    JVar vv = g.declareClassField("vv", vvClass);
-    JClass t = (JClass) g.getModel()._ref(SchemaChangeException.class);
-    JType objClass = g.getModel()._ref(Object.class);
-    JBlock b = g.getSetupBlock();
-    JVar obj = b.decl( //
-        objClass, //
-        g.getNextVar("tmp"), // 
-        JExpr.direct(varName).invoke("getValueVectorById").arg(JExpr.lit(fieldId)).arg( vvClass.dotclass()));
-        b._if(obj.eq(JExpr._null()))._then()._throw(JExpr._new(t).arg(JExpr.lit(String.format("Failure while loading vector %s with id %d", vv.name(), fieldId))));
-        b.assign(vv, JExpr.cast(vvClass, obj));
-        
-    return vv;
-  }
-  
+
   @Override
   public WritableBatch getWritableBatch() {
     return WritableBatch.get(this);
   }
   
+  private VectorAllocator getAllocator4(ValueVector outgoing){
+    if(outgoing instanceof FixedWidthVector){
+      return new FixedVectorAllocator((FixedWidthVector) outgoing);
+    }else if(outgoing instanceof VariableWidthVector ){
+      return new VariableEstimatedVector( (VariableWidthVector) outgoing, 50);
+    }else{
+      throw new UnsupportedOperationException();
+    }
+  }
+  
+  
   private VectorAllocator getAllocator(ValueVector in, ValueVector outgoing){
     if(outgoing instanceof FixedWidthVector){
       return new FixedVectorAllocator((FixedWidthVector) outgoing);
@@ -273,11 +225,23 @@ public class RemovingRecordBatch implements RecordBatch{
       out.allocateNew(recordCount);
       out.getMutator().setValueCount(recordCount);
     }
-
+  }
+  
+  private class VariableEstimatedVector implements VectorAllocator{
+    VariableWidthVector out;
+    int avgWidth;
     
+    public VariableEstimatedVector(VariableWidthVector out, int avgWidth) {
+      super();
+      this.out = out;
+      this.avgWidth = avgWidth;
+    }
     
+    public void alloc(int recordCount){
+      out.allocateNew(avgWidth * recordCount, recordCount);
+      out.getMutator().setValueCount(recordCount);
+    }
   }
-  
   private class VariableVectorAllocator implements VectorAllocator{
     VariableWidthVector in;
     VariableWidthVector out;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
index 4671baa..88708e2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
@@ -16,7 +16,7 @@ public class SVRemoverCreator implements BatchCreator<SelectionVectorRemover>{
   @Override
   public RecordBatch getBatch(FragmentContext context, SelectionVectorRemover config, List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
-    return new RemovingRecordBatch(children.iterator().next(), context);
+    return new RemovingRecordBatch(config, context, children.iterator().next());
   }
   
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
new file mode 100644
index 0000000..a2584b8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -0,0 +1,78 @@
+package org.apache.drill.exec.record;
+
+import java.util.Iterator;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements RecordBatch{
+  final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
+  
+  protected final VectorContainer container = new VectorContainer();
+  protected final T popConfig;
+  protected final FragmentContext context;
+  
+  protected AbstractRecordBatch(T popConfig, FragmentContext context) {
+    super();
+    this.context = context;
+    this.popConfig = popConfig;
+  }
+  
+  @Override
+  public Iterator<VectorWrapper<?>> iterator() {
+    return container.iterator();
+  }
+
+  @Override
+  public FragmentContext getContext() {
+    return context;
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    return container.getSchema();
+  }
+
+  @Override
+  public void kill() {
+    container.clear();
+    killIncoming();
+    cleanup();
+  }
+  
+  protected abstract void killIncoming();
+  
+  protected void cleanup(){
+  }
+  
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+    return container.getValueVector(path);
+  }
+
+  @Override
+  public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
+    return container.getVectorAccessor(fieldId, clazz);
+  }
+
+  
+  @Override
+  public WritableBatch getWritableBatch() {
+    return WritableBatch.get(this);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
new file mode 100644
index 0000000..10cc7ab
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -0,0 +1,52 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+
+public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> extends AbstractRecordBatch<T> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSingleRecordBatch.class);
+  
+  protected final RecordBatch incoming;
+  
+  public AbstractSingleRecordBatch(T popConfig, FragmentContext context, RecordBatch incoming) {
+    super(popConfig, context);
+    this.incoming = incoming;
+  }
+
+  @Override
+  protected void killIncoming() {
+    incoming.kill();
+  }
+
+  @Override
+  public IterOutcome next() {
+    IterOutcome upstream = incoming.next();
+    
+    switch(upstream){
+    case NONE:
+    case NOT_YET:
+    case STOP:
+      container.clear();
+      return upstream;
+    case OK_NEW_SCHEMA:
+      try{
+        setupNewSchema();
+      }catch(SchemaChangeException ex){
+        kill();
+        logger.error("Failure during query", ex);
+        context.fail(ex);
+        return IterOutcome.STOP;
+      }
+      // fall through.
+    case OK:
+      doWork();
+      return upstream; // change if upstream changed, otherwise normal.
+    default:
+      throw new UnsupportedOperationException();
+    }
+  }
+  
+  protected abstract void setupNewSchema() throws SchemaChangeException;
+  protected abstract void doWork();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
new file mode 100644
index 0000000..e8a5cf8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
@@ -0,0 +1,54 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.vector.ValueVector;
+
+public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HyperVectorWrapper.class);
+  
+  private T[] vectors;
+  private MaterializedField f;
+
+  public HyperVectorWrapper(MaterializedField f, T[] v){
+    assert(v.length > 0);
+    this.f = f;
+    this.vectors = v;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Class<T> getVectorClass() {
+    return (Class<T>) vectors.getClass().getComponentType();
+  }
+
+  @Override
+  public MaterializedField getField() {
+    return f;
+  }
+
+  @Override
+  public T getValueVector() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public T[] getValueVectors() {
+    return vectors;
+  }
+
+  @Override
+  public boolean isHyper() {
+    return true;
+  }
+
+  @Override
+  public void release() {
+    for(T x : vectors){
+      x.clear();  
+    }
+    
+  }
+  
+  public static <T extends ValueVector> HyperVectorWrapper<T> create(MaterializedField f, T[] v){
+    return new HyperVectorWrapper<T>(f, v);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java
deleted file mode 100644
index d820e0e..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.record;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-
-public class InvalidValueAccessor extends ExecutionSetupException{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InvalidValueAccessor.class);
-
-  public InvalidValueAccessor() {
-    super();
-  }
-
-  public InvalidValueAccessor(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
-    super(message, cause, enableSuppression, writableStackTrace);
-  }
-
-  public InvalidValueAccessor(String message, Throwable cause) {
-    super(message, cause);
-  }
-
-  public InvalidValueAccessor(String message) {
-    super(message);
-  }
-
-  public InvalidValueAccessor(Throwable cause) {
-    super(cause);
-  }
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index fc4e759..f747968 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.record;
 
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -32,7 +31,7 @@ import org.apache.drill.exec.vector.ValueVector;
  * A key thing to know is that the Iterator provided by record batch must align with the rank positions of the field ids
  * provided utilizing getValueVectorId();
  */
-public interface RecordBatch extends Iterable<ValueVector> {
+public interface RecordBatch extends Iterable<VectorWrapper<?>> {
 
   /**
    * Describes the outcome of a RecordBatch being incremented forward.
@@ -93,8 +92,7 @@ public interface RecordBatch extends Iterable<ValueVector> {
    *         TypedFieldId
    */
   public abstract TypedFieldId getValueVectorId(SchemaPath path);
-
-  public abstract <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> clazz);
+  public abstract VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz);
 
   /**
    * Update the data in each Field reading interface for the next range of records. Once a RecordBatch returns an
@@ -112,43 +110,4 @@ public interface RecordBatch extends Iterable<ValueVector> {
    */
   public WritableBatch getWritableBatch();
 
-  public static class TypedFieldId {
-    final MajorType type;
-    final int fieldId;
-
-    public TypedFieldId(MajorType type, int fieldId) {
-      super();
-      this.type = type;
-      this.fieldId = fieldId;
-    }
-
-    public MajorType getType() {
-      return type;
-    }
-
-    public int getFieldId() {
-      return fieldId;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj)
-        return true;
-      if (obj == null)
-        return false;
-      if (getClass() != obj.getClass())
-        return false;
-      TypedFieldId other = (TypedFieldId) obj;
-      if (fieldId != other.fieldId)
-        return false;
-      if (type == null) {
-        if (other.type != null)
-          return false;
-      } else if (!type.equals(other.type))
-        return false;
-      return true;
-    }
-
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index ebacd6e..593c28c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -29,18 +29,17 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
 import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
-import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
 import org.apache.drill.exec.vector.TypeHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
-public class RecordBatchLoader implements Iterable<ValueVector>{
+public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchLoader.class);
 
-  private List<ValueVector> vectors = Lists.newArrayList();
+  private VectorContainer container = new VectorContainer();
   private final BufferAllocator allocator;
   private int recordCount; 
   private BatchSchema schema;
@@ -66,11 +65,12 @@ public class RecordBatchLoader implements Iterable<ValueVector>{
     boolean schemaChanged = false;
 
     Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap();
-    for(ValueVector v : this.vectors){
+    for(VectorWrapper<?> w : container){
+      ValueVector v = w.getValueVector();
       oldFields.put(v.getField(), v);
     }
     
-    List<ValueVector> newVectors = Lists.newArrayList();
+    VectorContainer newVectors = new VectorContainer();
 
     List<FieldMetadata> fields = def.getFieldList();
     
@@ -79,7 +79,7 @@ public class RecordBatchLoader implements Iterable<ValueVector>{
       FieldDef fieldDef = fmd.getDef();
       ValueVector v = oldFields.remove(fieldDef);
       if(v != null){
-        newVectors.add(v);
+        container.add(v);
         continue;
       }
       
@@ -101,49 +101,51 @@ public class RecordBatchLoader implements Iterable<ValueVector>{
     
     // rebuild the schema.
     SchemaBuilder b = BatchSchema.newBuilder();
-    for(ValueVector v : newVectors){
+    for(VectorWrapper<?> v : newVectors){
       b.addField(v.getField());
     }
     b.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
     this.schema = b.build();
-    vectors = ImmutableList.copyOf(newVectors);
+    container = newVectors;
     return schemaChanged;
 
   }
 
-  public TypedFieldId getValueVector(SchemaPath path) {
-    for(int i =0; i < vectors.size(); i++){
-      ValueVector vv = vectors.get(i);
-      if(vv.getField().matches(path)) return new TypedFieldId(vv.getField().getType(), i); 
-    }
-    return null;
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+    return container.getValueVector(path);
   }
   
-  @SuppressWarnings("unchecked")
-  public <T extends ValueVector> T getValueVector(int fieldId, Class<?> clazz) {
-    ValueVector v = vectors.get(fieldId);
-    assert v != null;
-    if (v.getClass() != clazz){
-      logger.warn(String.format(
-          "Failure while reading vector.  Expected vector class of %s but was holding vector class %s.",
-          clazz.getCanonicalName(), v.getClass().getCanonicalName()));
-      return null;
-    }
-    return (T) v;
-  }
+  
+  
+//  
+//  @SuppressWarnings("unchecked")
+//  public <T extends ValueVector> T getValueVector(int fieldId, Class<?> clazz) {
+//    ValueVector v = container.get(fieldId);
+//    assert v != null;
+//    if (v.getClass() != clazz){
+//      logger.warn(String.format(
+//          "Failure while reading vector.  Expected vector class of %s but was holding vector class %s.",
+//          clazz.getCanonicalName(), v.getClass().getCanonicalName()));
+//      return null;
+//    }
+//    return (T) v;
+//  }
 
   public int getRecordCount() {
     return recordCount;
   }
 
-
+  public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz){
+    return container.getVectorAccessor(fieldId, clazz);
+  }
+  
   public WritableBatch getWritableBatch(){
-    return WritableBatch.getBatchNoSV(recordCount, vectors);
+    return WritableBatch.getBatchNoSVWrap(recordCount, container);
   }
 
   @Override
-  public Iterator<ValueVector> iterator() {
-    return this.vectors.iterator();
+  public Iterator<VectorWrapper<?>> iterator() {
+    return this.container.iterator();
   }
 
   public BatchSchema getSchema(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordMaker.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordMaker.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordMaker.java
deleted file mode 100644
index 9bc6e5f..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordMaker.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.record;
-
-public class RecordMaker {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordMaker.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordRemapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordRemapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordRemapper.java
deleted file mode 100644
index 86c963d..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordRemapper.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.apache.drill.exec.record;
-
-/**
- * Remove the selection vector from a record batch.
- */
-public class RecordRemapper {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordRemapper.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
new file mode 100644
index 0000000..94700a2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
@@ -0,0 +1,49 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.vector.ValueVector;
+
+public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleVectorWrapper.class);
+  
+  private T v;
+
+  public SimpleVectorWrapper(T v){
+    this.v = v;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Class<T> getVectorClass() {
+    return (Class<T>) v.getClass();
+  }
+
+  @Override
+  public MaterializedField getField() {
+    return v.getField();
+  }
+
+  @Override
+  public T getValueVector() {
+    return v;
+  }
+
+  @Override
+  public T[] getValueVectors() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isHyper() {
+    return false;
+  }
+  
+  
+  @Override
+  public void release() {
+    v.clear();
+  }
+
+  public static <T extends ValueVector> SimpleVectorWrapper<T> create(T v){
+    return new SimpleVectorWrapper<T>(v);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
new file mode 100644
index 0000000..3905fa8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
@@ -0,0 +1,58 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+
+public class TypedFieldId {
+  final MajorType type;
+  final int fieldId;
+  final boolean isHyperReader;
+
+  public TypedFieldId(MajorType type, int fieldId){
+    this(type, fieldId, false);
+  }
+  
+  public TypedFieldId(MajorType type, int fieldId, boolean isHyper) {
+    super();
+    this.type = type;
+    this.fieldId = fieldId;
+    this.isHyperReader = isHyper;
+  }
+
+  public boolean isHyperReader(){
+    return isHyperReader;
+  }
+  
+  public MajorType getType() {
+    return type;
+  }
+
+  public int getFieldId() {
+    return fieldId;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    TypedFieldId other = (TypedFieldId) obj;
+    if (fieldId != other.fieldId)
+      return false;
+    if (type == null) {
+      if (other.type != null)
+        return false;
+    } else if (!type.equals(other.type))
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "TypedFieldId [type=" + type + ", fieldId=" + fieldId + ", isSuperReader=" + isHyperReader + "]";
+  }
+
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
new file mode 100644
index 0000000..923fbd5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -0,0 +1,130 @@
+package org.apache.drill.exec.record;
+
+import java.lang.reflect.Array;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Preconditions;
+
+public class VectorContainer implements Iterable<VectorWrapper<?>> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainer.class);
+
+  private final List<VectorWrapper<?>> wrappers = Lists.newArrayList();
+  private BatchSchema schema;
+
+  public VectorContainer() {
+  }
+
+  public VectorContainer(List<ValueVector> vectors, List<ValueVector[]> hyperVectors) {
+    assert !vectors.isEmpty() || !hyperVectors.isEmpty();
+
+    addCollection(vectors);
+
+    for (ValueVector[] vArr : hyperVectors) {
+      add(vArr);
+    }
+  }
+  
+  public void addHyperList(List<ValueVector> vectors){
+    schema = null;
+    ValueVector[] vv = new ValueVector[vectors.size()];
+    for(int i =0; i < vv.length; i++){
+      vv[i] = vectors.get(i);
+    }
+    add(vv);
+  }
+
+  public void addCollection(Iterable<ValueVector> vectors) {
+    schema = null;
+    for (ValueVector vv : vectors) {
+      wrappers.add(SimpleVectorWrapper.create(vv));
+    }
+  }
+
+  public TypedFieldId add(ValueVector vv) {
+    schema = null;
+    int i = wrappers.size();
+    wrappers.add(SimpleVectorWrapper.create(vv));
+    return new TypedFieldId(vv.getField().getType(), i, false);
+  }
+
+  public void add(ValueVector[] hyperVector) {
+    assert hyperVector.length != 0;
+    schema = null;
+    Class<?> clazz = hyperVector[0].getClass();
+    ValueVector[] c = (ValueVector[]) Array.newInstance(clazz, hyperVector.length);
+    for (int i = 0; i < hyperVector.length; i++) {
+      c[i] = hyperVector[i];
+    }
+    // todo: work with a merged schema.
+    wrappers.add(HyperVectorWrapper.create(hyperVector[0].getField(), c));
+  }
+
+  public void remove(ValueVector v) {
+    schema = null;
+    for (Iterator<VectorWrapper<?>> iter = wrappers.iterator(); iter.hasNext();) {
+      VectorWrapper<?> w = iter.next();
+      if (!w.isHyper() && v == w.getValueVector()) {
+        iter.remove();
+        return;
+      }
+    }
+
+    throw new IllegalStateException("You attempted to remove a vector that didn't exist.");
+  }
+
+  public TypedFieldId getValueVector(SchemaPath path) {
+    for (int i = 0; i < wrappers.size(); i++) {
+      VectorWrapper<?> va = wrappers.get(i);
+      if (va.getField().matches(path))
+        return new TypedFieldId(va.getField().getType(), i, va.isHyper());
+    }
+    return null;
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T extends ValueVector> VectorWrapper<T> getVectorAccessor(int fieldId, Class<?> clazz) {
+    VectorWrapper<?> va = wrappers.get(fieldId);
+    assert va != null;
+    if (va.getVectorClass() != clazz) {
+      logger.warn(String.format(
+          "Failure while reading vector.  Expected vector class of %s but was holding vector class %s.",
+          clazz.getCanonicalName(), va.getVectorClass().getCanonicalName()));
+      return null;
+    }
+    return (VectorWrapper<T>) va;
+  }
+
+  public BatchSchema getSchema(){
+    Preconditions.checkNotNull(schema, "Schema is currently null.  You must call buildSchema(SelectionVectorMode) before this container can return a schema.");
+    return schema;
+  }
+  
+  public void buildSchema(SelectionVectorMode mode) {
+    SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(mode);
+    for (VectorWrapper<?> v : wrappers) {
+      bldr.addField(v.getField());
+    }
+    this.schema = bldr.build();
+  }
+
+  @Override
+  public Iterator<VectorWrapper<?>> iterator() {
+    return wrappers.iterator();
+  }
+
+  public void clear() {
+    // TODO: figure out a better approach for this.
+    // we don't clear schema because we want empty batches to carry previous schema to avoid extra schema update for no data.
+    // schema = null;
+    for (VectorWrapper<?> w : wrappers) {
+      w.release();
+    }
+    wrappers.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
new file mode 100644
index 0000000..e40dee4
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
@@ -0,0 +1,14 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.vector.ValueVector;
+
+
+public interface VectorWrapper<T extends ValueVector> {
+
+  public Class<T> getVectorClass();
+  public MaterializedField getField();
+  public T getValueVector();
+  public T[] getValueVectors();
+  public boolean isHyper();
+  public void release();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 685cc77..e84bf37 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.vector.ValueVector;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
@@ -57,6 +58,15 @@ public class WritableBatch {
     return buffers;
   }
 
+  public static WritableBatch getBatchNoSVWrap(int recordCount, Iterable<VectorWrapper<?>> vws) {
+    List<ValueVector> vectors = Lists.newArrayList();
+    for(VectorWrapper<?> vw : vws){
+      Preconditions.checkArgument(!vw.isHyper());
+      vectors.add(vw.getValueVector());
+    }
+    return getBatchNoSV(recordCount, vectors);
+  }
+  
   public static WritableBatch getBatchNoSV(int recordCount, Iterable<ValueVector> vectors) {
     List<ByteBuf> buffers = Lists.newArrayList();
     List<FieldMetadata> metadata = Lists.newArrayList();
@@ -83,7 +93,7 @@ public class WritableBatch {
   
   public static WritableBatch get(RecordBatch batch) {
     if(batch.getSchema() != null && batch.getSchema().getSelectionVectorMode() != SelectionVectorMode.NONE) throw new UnsupportedOperationException("Only batches without selections vectors are writable.");
-    return getBatchNoSV(batch.getRecordCount(), batch);
+    return getBatchNoSVWrap(batch.getRecordCount(), batch);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index 1f3874f..2020f92 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -20,22 +20,58 @@ package org.apache.drill.exec.record.selection;
 
 import io.netty.buffer.ByteBuf;
 
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.DeadBuf;
+import org.apache.drill.exec.exception.SchemaChangeException;
 
 public class SelectionVector4 {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector4.class);
 
-  private final BufferAllocator allocator;
-  private ByteBuf buffer = DeadBuf.DEAD_BUFFER;
-
-  public SelectionVector4(BufferAllocator allocator) {
-    this.allocator = allocator;
+  private final ByteBuf vector;
+  private final int recordCount;
+  private int start;
+  private int length;
+  
+  public SelectionVector4(ByteBuf vector, int recordCount, int batchRecordCount) throws SchemaChangeException {
+    if(recordCount > Integer.MAX_VALUE /4) throw new SchemaChangeException(String.format("Currently, Drill can only support allocations up to 2gb in size.  You requested an allocation of %d bytes.", recordCount * 4));
+    this.recordCount = recordCount;
+    this.start = 0;
+    this.length = Math.min(batchRecordCount, recordCount);
+    this.vector = vector;
   }
-
-  public int getCount(){
-    return -1;
+  
+  public int getTotalCount(){
+    return recordCount;
+  }
+  
+  public int getCurrentCount(){
+    return length;
+  }
+  
+  public void set(int index, int compound){
+    vector.setInt(index*4, compound);
+  }
+  public void set(int index, int recordBatch, int recordIndex){
+    vector.setInt(index*4, (recordBatch << 16) | (recordIndex & 65535));
+  }
+  
+  public int get(int index){
+    return vector.getInt(index*4);
   }
 
+  public int getStart() {
+    return start;
+  }
 
+  public int getLength() {
+    return length;
+  }
+  
+  public boolean next(){
+    if(start + length == recordCount) return false;
+    start = start+length;
+    int newEnd = Math.min(start+length, recordCount);
+    length = newEnd - start;
+    return true;
+  }
+  
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java
new file mode 100644
index 0000000..722f4d5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java
@@ -0,0 +1,37 @@
+package org.apache.drill.exec.record.selection;
+
+import java.util.List;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.collect.Lists;
+
+public class SelectionVector4Builder {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector4Builder.class);
+  
+  private List<BatchSchema> schemas = Lists.newArrayList();
+  
+  public void add(RecordBatch batch, boolean newSchema) throws SchemaChangeException{
+    if(!schemas.isEmpty() && newSchema) throw new SchemaChangeException("Currently, the sv4 builder doesn't support embedded types");
+    if(newSchema){
+      schemas.add(batch.getSchema());
+    }
+    
+  }
+  
+  
+  // deals with managing selection vectors.
+  // take a four byte int
+  /**
+   * take a four byte value
+   * use the first two as a pointer.  use the other two as a
+   * 
+   *  we should manage an array of valuevectors
+   */
+  
+  private class VectorSchemaBuilder{
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 36fd199..b2283a2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcBus;
 import org.apache.drill.exec.rpc.RpcException;
@@ -54,29 +55,40 @@ public class QueryResultHandler {
     final QueryResult result = RpcBus.get(pBody, QueryResult.PARSER);
     final QueryResultBatch batch = new QueryResultBatch(result, dBody);
     UserResultsListener l = resultsListener.get(result.getQueryId());
+    
+    boolean failed = batch.getHeader().getQueryState() == QueryState.FAILED;
     // logger.debug("For QueryId [{}], retrieved result listener {}", result.getQueryId(), l);
-    if (l != null) {
-      // logger.debug("Results listener available, using existing.");
-      l.resultArrived(batch);
-      if (result.getIsLastChunk()) {
-        resultsListener.remove(result.getQueryId(), l);
-      }
-    } else {
-      logger.debug("Results listener not available, creating a buffering listener.");
-      // manage race condition where we start getting results before we receive the queryid back.
+    if (l == null) {
       BufferingListener bl = new BufferingListener();
       l = resultsListener.putIfAbsent(result.getQueryId(), bl);
-      if (l != null) {
-        l.resultArrived(batch);
-      } else {
-        bl.resultArrived(batch);
-      }
+      // if we had a succesful insert, use that reference.  Otherwise, just throw away the new bufering listener.
+      if (l == null) l = bl;
     }
+      
+    if(failed){
+      l.submissionFailed(new RpcException("Remote failure while running query." + batch.getHeader().getErrorList()));
+      resultsListener.remove(result.getQueryId(), l);
+    }else{
+      l.resultArrived(batch);
+    }
+    
+    if (
+        (failed || result.getIsLastChunk())
+        && 
+        (!(l instanceof BufferingListener) || ((BufferingListener)l).output != null)
+        ) {
+      resultsListener.remove(result.getQueryId(), l);
+    }
+
   }
 
+  
+  
   private class BufferingListener implements UserResultsListener {
 
     private ConcurrentLinkedQueue<QueryResultBatch> results = Queues.newConcurrentLinkedQueue();
+    private volatile boolean finished = false;
+    private volatile RpcException ex;
     private volatile UserResultsListener output;
 
     public boolean transferTo(UserResultsListener l) {
@@ -87,12 +99,19 @@ public class QueryResultHandler {
           l.resultArrived(r);
           last = r.getHeader().getIsLastChunk();
         }
+        if(ex != null){
+          l.submissionFailed(ex);
+          return true;
+        }
         return last;
       }
     }
 
+    
     @Override
     public void resultArrived(QueryResultBatch result) {
+      if(result.getHeader().getIsLastChunk()) finished = true;
+      
       synchronized (this) {
         if (output == null) {
           this.results.add(result);
@@ -104,7 +123,18 @@ public class QueryResultHandler {
 
     @Override
     public void submissionFailed(RpcException ex) {
-      throw new UnsupportedOperationException("You cannot report failed submissions to a buffering listener.");
+      finished = true;
+      synchronized (this) {
+        if (output == null){
+          this.ex = ex;
+        } else{
+          output.submissionFailed(ex);
+        }
+      }
+    }
+    
+    public boolean isFinished(){
+      return finished;
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index ad54a07..1b1e39a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -28,6 +28,7 @@ abstract class BaseDataValueVector extends BaseValueVector{
       valueCount = 0;
     }
   }
+
   
   @Override
   public ByteBuf[] getBuffers(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SerializableVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SerializableVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SerializableVector.java
new file mode 100644
index 0000000..d43c38e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SerializableVector.java
@@ -0,0 +1,7 @@
+package org.apache.drill.exec.vector;
+
+
+public interface SerializableVector extends ValueVector{
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index 717c087..dfe8e8c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -32,6 +32,8 @@ import org.apache.drill.exec.record.TransferPair;
  */
 public interface ValueVector extends Closeable {
 
+  public int getBufferSize();
+  
   /**
    * Alternative to clear(). Allows use as closeable in try-with-resources.
    */

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
index 7668bdc..9fd33b9 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
@@ -23,7 +23,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
 import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
 import org.apache.drill.exec.work.foreman.ErrorHelper;
 
-public class AbstractFragmentRunnerListener implements FragmentRunnerListener{
+public abstract class AbstractFragmentRunnerListener implements FragmentRunnerListener{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractFragmentRunnerListener.class);
   
   private FragmentContext context;
@@ -90,9 +90,9 @@ public class AbstractFragmentRunnerListener implements FragmentRunnerListener{
     statusChange(handle, statusBuilder.build());
   }
   
-  protected void statusChange(FragmentHandle handle, FragmentStatus status){
+  protected abstract void statusChange(FragmentHandle handle, FragmentStatus status);
     
-  }
+  
   
   @Override
   public final void fail(FragmentHandle handle, String message, Throwable excep) {
@@ -103,7 +103,6 @@ public class AbstractFragmentRunnerListener implements FragmentRunnerListener{
 
   protected void fail(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
     statusChange(handle, statusBuilder.build());
-    // TODO: ensure the foreman handles the exception
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
index 74fcd2b..ef7bcb1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
@@ -38,8 +38,10 @@ public class RemotingFragmentRunnerListener extends AbstractFragmentRunnerListen
     this.tunnel = tunnel;
   }
   
+  
   @Override
   protected void statusChange(FragmentHandle handle, FragmentStatus status) {
+    logger.debug("Sending remote failure.");
     tunnel.sendFragmentStatus(status);
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
index d4c4014..9a33109 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
@@ -31,11 +31,24 @@ public class ErrorHelper {
     DrillPBError.Builder builder = DrillPBError.newBuilder();
     builder.setEndpoint(endpoint);
     builder.setErrorId(id);
+    StringBuilder sb = new StringBuilder();
     if(message != null){
-      builder.setMessage(message);  
-    }else{
-      builder.setMessage(t.getMessage());
+      sb.append(message);
     }
+      
+    do{
+      sb.append(" < ");
+      sb.append(t.getClass().getSimpleName());
+      if(t.getMessage() != null){
+        sb.append(":[ ");
+        sb.append(t.getMessage());
+        sb.append(" ]");
+      }
+    }while(t.getCause() != null && t.getCause() != t);
+    
+    builder.setMessage(sb.toString());
+    
+
     builder.setErrorType(0);
     
     // record the error to the log for later reference.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
index 65fc8c7..af91a6b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
 import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
 import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserProtos.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
 import org.apache.drill.exec.rpc.RpcException;
@@ -61,6 +62,7 @@ class RunningFragmentManager implements FragmentStatusListener{
   private ForemanManagerListener foreman;
   private AtomicInteger remainingFragmentCount;
   private FragmentRunner rootRunner;
+  private volatile QueryId queryId;
   
   public RunningFragmentManager(ForemanManagerListener foreman, TunnelManager tun) {
     super();
@@ -72,6 +74,7 @@ class RunningFragmentManager implements FragmentStatusListener{
 
   public void runFragments(WorkerBee bee, PlanFragment rootFragment, FragmentRoot rootOperator, UserClientConnection rootClient, List<PlanFragment> leafFragments) throws ExecutionSetupException{
     remainingFragmentCount.set(leafFragments.size()+1);
+    queryId = rootFragment.getHandle().getQueryId();
 
     // set up the root fragment first so we'll have incoming buffers available.
     {
@@ -146,7 +149,7 @@ class RunningFragmentManager implements FragmentStatusListener{
   private void fail(FragmentStatus status){
     updateStatus(status);
     stopQuery();
-    QueryResult result = QueryResult.newBuilder().setQueryState(QueryState.FAILED).build();
+    QueryResult result = QueryResult.newBuilder().setQueryId(queryId).setQueryState(QueryState.FAILED).addError(status.getError()).build();
     foreman.cleanupAndSendResult(result);
   }
  
@@ -262,7 +265,7 @@ class RunningFragmentManager implements FragmentStatusListener{
 
     @Override
     protected void statusChange(FragmentHandle handle, FragmentStatus status) {
-      RunningFragmentManager.this.updateStatus(status);
+      RunningFragmentManager.this.statusUpdate(status);
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/EscapeTest1.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/EscapeTest1.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/EscapeTest1.java
new file mode 100644
index 0000000..a41a883
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/EscapeTest1.java
@@ -0,0 +1,181 @@
+package org.apache.drill.exec.expr;
+
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
+
+public final class EscapeTest1 {
+  
+  public static class Timer{
+    long n1;
+    String name;
+    public Timer(String name){
+      this.n1 = System.nanoTime();
+      this.name = name;
+    }
+    
+    public void print(long sum){
+      System.out.println(String.format("Completed %s in %d ms.  Output was %d", name, (System.nanoTime() - n1)/1000/1000, sum));
+    }
+  }
+  
+  public static Timer time(String name){
+    return new Timer(name);
+  }
+  
+  public static void main(String args[]){
+    EscapeTest1 et = new EscapeTest1();
+    Monkey m = new Monkey();
+    for(int i =0; i < 10; i++){
+      time("noalloc").print(et.noAlloc());
+      time("alloc").print(et.alloc());
+      time("set noalloc").print(et.setNoAlloc(m));
+      time("set alloc").print(et.setAlloc(m));
+      time("get noalloc").print(et.getNoAlloc(m));
+      time("get alloc").print(et.getAlloc(m));
+      time("get return alloc").print(et.getReturnAlloc(m));
+    }
+  }
+  
+  public long noAlloc(){
+    long sum = 0;
+    for(int i =0; i < 1000000000; i++){
+      sum+= add(i+1, i+2);
+    }
+    return sum;
+  }
+  
+  public long alloc(){
+    long sum = 0;
+    for(int i =0; i < 1000000000; i++){
+      Ad ad = new Ad(i+1, i+2); 
+      sum+= add(ad.x, ad.y);
+    }
+    return sum;
+  }
+  
+  public long setAlloc(Monkey m){
+    long sum = 0;
+    for(int i =0; i < 490000000; i++){
+      EH h = new EH(i+1, i+2);
+      m.set(h);
+      sum += i;
+    }
+    return sum; 
+  }
+  
+  public long setNoAlloc(Monkey m){
+    long sum = 0;
+    for(int i =0; i < 490000000; i++){
+      m.set(i+1, i+2);
+      sum += i;
+    }
+    return sum; 
+  }
+  
+  public long getAlloc(Monkey m){
+    long sum = 0;
+    for(int i =0; i < 490000000; i++){
+      RR r = new RR();
+      m.get(i, i+1, r);
+      sum += r.v1 + r.v2;
+    }
+    return sum; 
+  }
+  
+  public long getNoAlloc(Monkey m){
+    long sum = 0;
+    for(int i =0; i < 490000000; i++){
+      int i1 = m.getV1(i);
+      int i2 = m.getV2(i+1);
+      sum += i1 + i2;
+    }
+    return sum; 
+  }
+  
+  public long getReturnAlloc(Monkey m){
+    long sum = 0;
+    for(int i =0; i < 490000000; i++){
+      RR r = m.get(i, i+1);
+      sum += r.v1 + r.v2;
+    }
+    return sum; 
+  }
+  
+  
+  public class Ad{
+    long x;
+    long y;
+    public Ad(long x, long y) {
+      super();
+      this.x = x;
+      this.y = y;
+    }
+  }
+
+  
+  public static final class EH{
+    int index;
+    int value;
+    public EH(int index, int value) {
+      super();
+      this.index = index;
+      this.value = value;
+    }
+  }
+  
+  public static final class RR{
+    int v1;
+    int v2;
+    
+    public RR(){
+      
+    }
+    public RR(int v1, int v2) {
+      super();
+      this.v1 = v1;
+      this.v2 = v2;
+    }
+  }
+  
+  public long add(long a, long b){
+    return a + b;
+  }
+  
+  
+  public final static class Monkey{
+    final IntBuffer buf;
+    
+    public Monkey(){
+      ByteBuffer bb = ByteBuffer.allocateDirect(Integer.MAX_VALUE);
+      buf = bb.asIntBuffer();
+    }
+    
+    public final void set(int index, int value){
+      buf.put(index, value);
+    }
+    
+    public final void set(EH a){
+      buf.put(a.index, a.value);
+    }
+    
+    public final int getV1(int index){
+      return buf.get(index);
+    }
+    
+    public final int getV2(int index){
+      return buf.get(index);
+    }
+    
+    public final RR get(int index1, int index2){
+      return new RR(buf.get(index1), buf.get(index2));
+    }
+    
+    public final void get(int index1, int index2, RR rr){
+      rr.v1 = buf.get(index1);
+      rr.v2 = buf.get(index2);
+    }
+    
+  }
+  
+  
+}


[3/3] git commit: Sort operator using HyperBatch concept. Supports single and multikey asc/desc sort. One comparator function definition. Add abstract record batch, vector container and vector wrapper. Various fixes to return to client when query fails.

Posted by ja...@apache.org.
Sort operator using HyperBatch concept.  Supports single and multikey asc/desc sort.
One comparator function definition.
Add abstract record batch, vector container and vector wrapper.
Various fixes to return to client when query fails.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/db3afaa8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/db3afaa8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/db3afaa8

Branch: refs/heads/master
Commit: db3afaa854fc8475592907dba97162ecf869f9df
Parents: 31f1964
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sun Aug 4 15:47:31 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Aug 6 14:37:27 2013 -0700

----------------------------------------------------------------------
 .../drill/common/expression/FunctionCall.java   |   1 +
 .../templates/NullableValueVectors.java         |   2 +-
 .../templates/RepeatedValueVectors.java         |   2 +-
 .../templates/VariableLengthVectors.java        |   5 +-
 .../apache/drill/exec/expr/CodeGenerator.java   |  39 ++-
 .../drill/exec/expr/EvaluationVisitor.java      |  62 ++---
 .../exec/expr/ExpressionTreeMaterializer.java   |   2 +-
 .../exec/expr/ValueVectorReadExpression.java    |  33 ++-
 .../exec/expr/ValueVectorWriteExpression.java   |  13 +-
 .../drill/exec/expr/fn/impl/Alternator.java     |  53 ++--
 .../exec/expr/fn/impl/ComparatorFunctions.java  |  58 +++++
 .../apache/drill/exec/ops/FragmentContext.java  |   5 +-
 .../apache/drill/exec/physical/base/Sender.java |   1 +
 .../exec/physical/impl/FilterRecordBatch.java   | 130 ----------
 .../impl/FilteringRecordBatchTransformer.java   |  58 -----
 .../drill/exec/physical/impl/ImplCreator.java   |   9 +
 .../drill/exec/physical/impl/RootExec.java      |   1 -
 .../drill/exec/physical/impl/ScanBatch.java     |  26 +-
 .../drill/exec/physical/impl/ScreenCreator.java |   4 +-
 .../exec/physical/impl/SingleSenderCreator.java |   2 +-
 .../drill/exec/physical/impl/VectorHolder.java  |  39 ---
 .../exec/physical/impl/WireRecordBatch.java     |  15 +-
 .../physical/impl/filter/FilterRecordBatch.java | 129 ++--------
 .../impl/materialize/RecordMaterializer.java    |   6 -
 .../impl/project/ProjectRecordBatch.java        | 141 ++---------
 .../exec/physical/impl/sort/Comparator.java     |  11 +
 .../physical/impl/sort/ReadIndexRewriter.java   |  87 +++++++
 .../physical/impl/sort/RecordBatchData.java     |  47 ++++
 .../exec/physical/impl/sort/SortBatch.java      | 171 +++++++++++++
 .../physical/impl/sort/SortBatchCreator.java    |  23 ++
 .../impl/sort/SortRecordBatchBuilder.java       | 130 ++++++++++
 .../exec/physical/impl/sort/SortTemplate.java   |  45 ++++
 .../drill/exec/physical/impl/sort/Sorter.java   |  17 ++
 .../exec/physical/impl/svremover/Copier.java    |   7 +-
 .../physical/impl/svremover/CopierTemplate.java |  43 ----
 .../impl/svremover/CopierTemplate2.java         |  43 ++++
 .../impl/svremover/CopierTemplate4.java         |  47 ++++
 .../impl/svremover/RemovingRecordBatch.java     | 246 ++++++++-----------
 .../impl/svremover/SVRemoverCreator.java        |   2 +-
 .../drill/exec/record/AbstractRecordBatch.java  |  78 ++++++
 .../exec/record/AbstractSingleRecordBatch.java  |  52 ++++
 .../drill/exec/record/HyperVectorWrapper.java   |  54 ++++
 .../drill/exec/record/InvalidValueAccessor.java |  46 ----
 .../apache/drill/exec/record/RecordBatch.java   |  45 +---
 .../drill/exec/record/RecordBatchLoader.java    |  66 ++---
 .../apache/drill/exec/record/RecordMaker.java   |  22 --
 .../drill/exec/record/RecordRemapper.java       |   8 -
 .../drill/exec/record/SimpleVectorWrapper.java  |  49 ++++
 .../apache/drill/exec/record/TypedFieldId.java  |  58 +++++
 .../drill/exec/record/VectorContainer.java      | 130 ++++++++++
 .../apache/drill/exec/record/VectorWrapper.java |  14 ++
 .../apache/drill/exec/record/WritableBatch.java |  12 +-
 .../exec/record/selection/SelectionVector4.java |  56 ++++-
 .../selection/SelectionVector4Builder.java      |  37 +++
 .../drill/exec/rpc/user/QueryResultHandler.java |  60 +++--
 .../drill/exec/vector/BaseDataValueVector.java  |   1 +
 .../drill/exec/vector/SerializableVector.java   |   7 +
 .../apache/drill/exec/vector/ValueVector.java   |   2 +
 .../work/AbstractFragmentRunnerListener.java    |   7 +-
 .../work/RemotingFragmentRunnerListener.java    |   2 +
 .../drill/exec/work/foreman/ErrorHelper.java    |  19 +-
 .../work/foreman/RunningFragmentManager.java    |   7 +-
 .../org/apache/drill/exec/expr/EscapeTest1.java | 181 ++++++++++++++
 .../apache/drill/exec/expr/ExpressionTest.java  |  22 +-
 .../exec/physical/impl/SimpleRootExec.java      |  16 +-
 .../physical/impl/TestSimpleFragmentRun.java    |  11 +-
 .../exec/physical/impl/sort/TestSimpleSort.java | 145 +++++++++++
 .../record/ExpressionTreeMaterializerTest.java  |   1 -
 .../src/test/resources/sort/one_key_sort.json   |  44 ++++
 .../src/test/resources/sort/two_key_sort.json   |  54 ++++
 .../drill/sql/client/full/BatchListener.java    |  17 +-
 .../drill/sql/client/full/BatchLoaderMap.java   |   5 +-
 .../apache/drill/jdbc/test/FullEngineTest.java  |  10 +
 .../org/apache/drill/jdbc/test/JdbcTest.java    |   5 +
 74 files changed, 2150 insertions(+), 948 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FunctionCall.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FunctionCall.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FunctionCall.java
index d27b584..d343fd6 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FunctionCall.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FunctionCall.java
@@ -76,4 +76,5 @@ public class FunctionCall extends LogicalExpressionBase implements Iterable<Logi
   }
 
   
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
index 644019e..ca222df 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
@@ -68,7 +68,7 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
     values.clear();
   }
   
-  int getBufferSize(){
+  public int getBufferSize(){
     return values.getBufferSize() + bits.getBufferSize();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
index 26564c1..79f05b6 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
@@ -59,7 +59,7 @@ import com.google.common.collect.Lists;
     return values.getValueCapacity();
   }
   
-  int getBufferSize(){
+  public int getBufferSize(){
     return offsets.getBufferSize() + values.getBufferSize();
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
index 95965f7..4492aa9 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
@@ -49,7 +49,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     this.offsetVector = new UInt${type.width}Vector(null, allocator);
   }
 
-
+  public int getBufferSize(){
+    return offsetVector.getBufferSize() + data.writerIndex();
+  }
+  
   int getSizeFromCount(int valueCount) {
     return valueCount * ${type.width};
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
index 67ca614..9895782 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
@@ -12,14 +12,18 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.TypeHelper;
 
 import com.google.common.base.Preconditions;
 import com.sun.codemodel.JBlock;
+import com.sun.codemodel.JClass;
 import com.sun.codemodel.JClassAlreadyExistsException;
 import com.sun.codemodel.JCodeModel;
 import com.sun.codemodel.JDefinedClass;
 import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JExpression;
 import com.sun.codemodel.JFieldRef;
 import com.sun.codemodel.JMethod;
 import com.sun.codemodel.JMod;
@@ -61,11 +65,42 @@ public class CodeGenerator<T> {
   }
   
 
+  public JVar declareVectorValueSetupAndMember(String batchName, TypedFieldId fieldId){
+    Class<?> valueVectorClass = TypeHelper.getValueVectorClass(fieldId.getType().getMinorType(), fieldId.getType().getMode());
+    JClass vvClass = model.ref(valueVectorClass);
+    JClass retClass = vvClass;
+    String vectorAccess = "getValueVector";
+    if(fieldId.isHyperReader()){
+      retClass = retClass.array();
+      vectorAccess = "getValueVectors";
+    }
+    
+    JVar vv = declareClassField("vv", retClass);
+    JClass t = model.ref(SchemaChangeException.class);
+    JType wrapperClass = model.ref(VectorWrapper.class);
+    JType objClass = model.ref(Object.class);
+    JBlock b = getSetupBlock();
+    JVar obj = b.decl( //
+        objClass, //
+        getNextVar("tmp"), // 
+        JExpr.direct(batchName)
+          .invoke("getValueAccessorById") //
+          .arg(JExpr.lit(fieldId.getFieldId())) //
+          .arg( vvClass.dotclass())
+          .invoke(vectorAccess)//
+          );
+        
+        b._if(obj.eq(JExpr._null()))._then()._throw(JExpr._new(t).arg(JExpr.lit(String.format("Failure while loading vector %s with id: %s.", vv.name(), fieldId.toString()))));
+        //b.assign(vv, JExpr.cast(retClass, ((JExpression) JExpr.cast(wrapperClass, obj) ).invoke(vectorAccess)));
+        b.assign(vv, JExpr.cast(retClass, obj ));
+        
+    return vv;
+  }
 
-  public void addExpr(LogicalExpression ex){
+  public HoldingContainer addExpr(LogicalExpression ex){
     logger.debug("Adding next write {}", ex);
     rotateBlock();
-    ex.accept(evaluationVisitor, this);
+    return ex.accept(evaluationVisitor, this);
   }
   
   public void rotateBlock(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index 03a2a6b..93d5b5b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -1,7 +1,5 @@
 package org.apache.drill.exec.expr;
 
-import com.google.common.base.Charsets;
-import com.sun.codemodel.*;
 import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.IfExpression;
 import org.apache.drill.common.expression.IfExpression.IfCondition;
@@ -13,15 +11,17 @@ import org.apache.drill.common.expression.ValueExpressions.LongExpression;
 import org.apache.drill.common.expression.ValueExpressions.QuotedString;
 import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
 import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.CodeGenerator.HoldingContainer;
 import org.apache.drill.exec.expr.fn.FunctionHolder;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.vector.TypeHelper;
 
-import com.google.common.base.Preconditions;
+import com.sun.codemodel.JBlock;
+import com.sun.codemodel.JConditional;
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JInvocation;
+import com.sun.codemodel.JVar;
 
 public class EvaluationVisitor extends AbstractExprVisitor<HoldingContainer, CodeGenerator<?>, RuntimeException> {
 
@@ -122,7 +122,7 @@ public class EvaluationVisitor extends AbstractExprVisitor<HoldingContainer, Cod
   @Override
   public HoldingContainer visitUnknown(LogicalExpression e, CodeGenerator<?> generator) throws RuntimeException {
     if(e instanceof ValueVectorReadExpression){
-      return visitValueVectorExpression((ValueVectorReadExpression) e, generator);
+      return visitValueVectorReadExpression((ValueVectorReadExpression) e, generator);
     }else if(e instanceof ValueVectorWriteExpression){
       return visitValueVectorWriteExpression((ValueVectorWriteExpression) e, generator);
     }else if(e instanceof ReturnValueExpression){
@@ -138,16 +138,7 @@ public class EvaluationVisitor extends AbstractExprVisitor<HoldingContainer, Cod
     HoldingContainer hc = child.accept(this, generator);
     JBlock block = generator.getBlock();
     
-    Class<?> vvClass = TypeHelper.getValueVectorClass(child.getMajorType().getMinorType(), child.getMajorType().getMode());
-    JType vvType = generator.getModel()._ref(vvClass);
-    JVar vv = generator.declareClassField("vv", vvType);
-    
-    // get value vector in setup block.
-    JVar obj = generator.getSetupBlock().decl( //
-        generator.getModel()._ref(Object.class), //
-        generator.getNextVar("obj"), // 
-        JExpr.direct("outgoing").invoke("getValueVectorById").arg(JExpr.lit(e.getFieldId())).arg( ((JClass)vvType).dotclass()));
-    generator.getSetupBlock().assign(vv, JExpr.cast(vvType, obj));
+    JVar vv = generator.declareVectorValueSetupAndMember("outgoing", e.getFieldId());
     
     if(hc.isOptional()){
       vv.invoke("getMutator").invoke("set").arg(JExpr.direct("outIndex"));
@@ -162,42 +153,42 @@ public class EvaluationVisitor extends AbstractExprVisitor<HoldingContainer, Cod
     return null;
   }
   
-  private HoldingContainer visitValueVectorExpression(ValueVectorReadExpression e, CodeGenerator<?> generator) throws RuntimeException{
+  private HoldingContainer visitValueVectorReadExpression(ValueVectorReadExpression e, CodeGenerator<?> generator) throws RuntimeException{
     // declare value vector
-    Class<?> vvClass = TypeHelper.getValueVectorClass(e.getMajorType().getMinorType(), e.getMajorType().getMode());
-    JType vvType = generator.getModel()._ref(vvClass);
-    JVar vv1 = generator.declareClassField("vv", vvType);
     
-    // get value vector from incoming batch and 
-    // get value vector in setup block.
-    JVar obj = generator.getSetupBlock().decl( //
-        generator.getModel()._ref(Object.class), //
-        generator.getNextVar("obj"), // 
-        JExpr.direct("incoming").invoke("getValueVectorById").arg(JExpr.lit(e.getFieldId())).arg( ((JClass)vvType).dotclass()));
-    generator.getSetupBlock().assign(vv1, JExpr.cast(vvType, obj));
-
+    JVar vv1 = generator.declareVectorValueSetupAndMember("incoming", e.getFieldId());
+    final String indexVariableName = e.isNamed() ? e.getIndexVariableName() : "inIndex";
+    JExpression indexVariable = JExpr.direct(indexVariableName);
+    
+    JInvocation getValueAccessor = vv1.invoke("getAccessor").invoke("get");
+    if(e.isSuperReader()){
+      
+      getValueAccessor = ((JExpression) vv1.component(indexVariable.shrz(JExpr.lit(16)))).invoke("getAccessor").invoke("get");
+      indexVariable = indexVariable.band(JExpr.lit((int) Character.MAX_VALUE));
+    }
+    
+    
     // evaluation work.
     HoldingContainer out = generator.declare(e.getMajorType());
     
-    
     if(out.isOptional()){
       JBlock blk = generator.getBlock();
-      blk.assign(out.getIsSet(), vv1.invoke("getAccessor").invoke("isSet").arg(JExpr.direct("inIndex")));
+      blk.assign(out.getIsSet(), vv1.invoke("getAccessor").invoke("isSet").arg(indexVariable));
       JConditional jc = blk._if(out.getIsSet().eq(JExpr.lit(1)));
       if (e.getMajorType().getMinorType() == TypeProtos.MinorType.VARCHAR ||
           e.getMajorType().getMinorType() == TypeProtos.MinorType.VARBINARY) {
         jc._then()
-            .add(vv1.invoke("getAccessor").invoke("get").arg(JExpr.direct("inIndex")).arg(out.getHolder()));
+            .add(getValueAccessor.arg(JExpr.direct("inIndex")).arg(out.getHolder()));
       } else {
         jc._then()
-            .assign(out.getValue(), vv1.invoke("getAccessor").invoke("get").arg(JExpr.direct("inIndex")));
+            .assign(out.getValue(), getValueAccessor.arg(indexVariable));
       }
     }else{
       if (e.getMajorType().getMinorType() == TypeProtos.MinorType.VARCHAR ||
           e.getMajorType().getMinorType() == TypeProtos.MinorType.VARBINARY) {
-        generator.getBlock().add(vv1.invoke("getAccessor").invoke("get").arg(JExpr.direct("inIndex")).arg(out.getHolder()));
+        generator.getBlock().add(getValueAccessor.arg(indexVariable).arg(out.getHolder()));
       } else {
-        generator.getBlock().assign(out.getValue(), vv1.invoke("getAccessor").invoke("get").arg(JExpr.direct("inIndex")));
+        generator.getBlock().assign(out.getValue(), getValueAccessor.arg(indexVariable));
       }
     }
     return out;
@@ -215,6 +206,7 @@ public class EvaluationVisitor extends AbstractExprVisitor<HoldingContainer, Cod
   @Override
   public HoldingContainer visitQuotedStringConstant(QuotedString e, CodeGenerator<?> CodeGenerator) throws RuntimeException {
     throw new UnsupportedOperationException("We don't yet support string literals as we need to build the string value holders.");
+    
 //    JExpr stringLiteral = JExpr.lit(e.value);
 //    CodeGenerator.block.decl(stringLiteral.invoke("getBytes").arg(JExpr.ref(Charsets.UTF_8));
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
index 72e5c93..07db72c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
@@ -30,7 +30,7 @@ import org.apache.drill.common.expression.ValueExpressions;
 import org.apache.drill.common.expression.visitors.SimpleExprVisitor;
 import org.apache.drill.exec.record.NullExpression;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
+import org.apache.drill.exec.record.TypedFieldId;
 
 import com.google.common.collect.Lists;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
index b1888d6..f86bd29 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
@@ -4,19 +4,42 @@ import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.visitors.ExprVisitor;
 import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
+import org.apache.drill.exec.record.TypedFieldId;
 
 public class ValueVectorReadExpression implements LogicalExpression{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ValueVectorReadExpression.class);
 
   private final MajorType type;
-  private final int fieldId;
+  private final TypedFieldId fieldId;
+  private final boolean superReader;
+  private final String indexVariableName;
   
-  public ValueVectorReadExpression(TypedFieldId tfId) {
+  public ValueVectorReadExpression(TypedFieldId tfId){
+    this(tfId, null);
+  }
+  
+  public ValueVectorReadExpression(TypedFieldId tfId, String indexVariableName) {
     this.type = tfId.getType();
-    this.fieldId = tfId.getFieldId();
+    this.fieldId = tfId;
+    this.superReader = tfId.isHyperReader();
+    this.indexVariableName = indexVariableName;
   }
 
+  public boolean isNamed(){
+    return indexVariableName != null;
+  }
+  
+  public String getIndexVariableName(){
+    return indexVariableName;
+  }
+  
+  public TypedFieldId getTypedFieldId(){
+    return fieldId;
+  }
+  
+  public boolean isSuperReader(){
+    return superReader;
+  }
   @Override
   public MajorType getMajorType() {
     return type;
@@ -27,7 +50,7 @@ public class ValueVectorReadExpression implements LogicalExpression{
     return visitor.visitUnknown(this, value);
   }
 
-  public int getFieldId() {
+  public TypedFieldId getFieldId() {
     return fieldId;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorWriteExpression.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorWriteExpression.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorWriteExpression.java
index 417b975..58141d8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorWriteExpression.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorWriteExpression.java
@@ -5,19 +5,26 @@ import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.visitors.ExprVisitor;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.record.TypedFieldId;
 
 public class ValueVectorWriteExpression implements LogicalExpression {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ValueVectorWriteExpression.class);
 
-  private final int fieldId;
+  private final TypedFieldId fieldId;
   private final LogicalExpression child;
+  private final String indexVariableName;
   
-  public ValueVectorWriteExpression(int fieldId, LogicalExpression child){
+  public ValueVectorWriteExpression(TypedFieldId fieldId, LogicalExpression child){
+    this(fieldId, child, null);
+  }
+  
+  public ValueVectorWriteExpression(TypedFieldId fieldId, LogicalExpression child, String indexVariableName){
     this.fieldId = fieldId;
     this.child = child;
+    this.indexVariableName = indexVariableName;
   }
   
-  public int getFieldId() {
+  public TypedFieldId getFieldId() {
     return fieldId;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Alternator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Alternator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Alternator.java
index 0915fa3..d2bcaa2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Alternator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Alternator.java
@@ -16,24 +16,48 @@ import org.apache.drill.exec.vector.BigIntHolder;
 
 
 
-@FunctionTemplate(name = "alternate", scope = FunctionScope.SIMPLE)
-public class Alternator implements DrillFunc{
-
-  @Workspace int val;
-  @Output BigIntHolder out;
+public class Alternator {
   
-  public void setup(RecordBatch incoming) {
-    val = 0;
-  }
+  @FunctionTemplate(name = "alternate", scope = FunctionScope.SIMPLE)
+  public static class Alternate2 implements DrillFunc{
+    @Workspace int val;
+    @Output BigIntHolder out;
+    
+    public void setup(RecordBatch incoming) {
+      val = 0;
+    }
+
 
+    public void eval() {
+      out.value = val;
+      if(val == 0){
+        val = 1;
+      }else{
+        val = 0;
+      }
+    }
+  }
 
-  public void eval() {
-    out.value = val;
-    if(val == 0){
-      val = 1;
-    }else{
+  @FunctionTemplate(name = "alternate3", scope = FunctionScope.SIMPLE)
+  public static class Alternate3 implements DrillFunc{
+    @Workspace int val;
+    @Output BigIntHolder out;
+    
+    public void setup(RecordBatch incoming) {
       val = 0;
     }
+
+
+    public void eval() {
+      out.value = val;
+      if(val == 0){
+        val = 1;
+      }else if(val == 1){
+        val = 2;
+      }else{
+        val = 0;
+      }
+    }
   }
   
   public static class Provider implements CallProvider{
@@ -41,7 +65,8 @@ public class Alternator implements DrillFunc{
     @Override
     public FunctionDefinition[] getFunctionDefintions() {
       return new FunctionDefinition[]{
-          FunctionDefinition.simple("alternate", NoArgValidator.VALIDATOR, new OutputTypeDeterminer.FixedType(Types.required(MinorType.BIGINT)), "alternate")
+          FunctionDefinition.simple("alternate", NoArgValidator.VALIDATOR, new OutputTypeDeterminer.FixedType(Types.required(MinorType.BIGINT)), "alternate"),
+          FunctionDefinition.simple("alternate3", NoArgValidator.VALIDATOR, new OutputTypeDeterminer.FixedType(Types.required(MinorType.BIGINT)), "alternate3")
       };
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/ComparatorFunctions.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/ComparatorFunctions.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/ComparatorFunctions.java
new file mode 100644
index 0000000..471bdf0
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/ComparatorFunctions.java
@@ -0,0 +1,58 @@
+package org.apache.drill.exec.expr.fn.impl;
+
+import org.apache.drill.common.expression.ArgumentValidators;
+import org.apache.drill.common.expression.CallProvider;
+import org.apache.drill.common.expression.FunctionDefinition;
+import org.apache.drill.common.expression.OutputTypeDeterminer;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.DrillFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.vector.BigIntHolder;
+import org.apache.drill.exec.vector.IntHolder;
+
+public class ComparatorFunctions {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComparatorFunctions.class);
+  
+  @FunctionTemplate(name = "compare_to", scope = FunctionTemplate.FunctionScope.SIMPLE)
+  public static class IntComparator implements DrillFunc {
+
+      @Param IntHolder left;
+      @Param IntHolder right;
+      @Output IntHolder out;
+
+      public void setup(RecordBatch b) {}
+
+      public void eval() {
+        out.value = left.value < right.value ? -1 : ((left.value == right.value)? 0 : 1);
+      }
+  }
+  
+  @FunctionTemplate(name = "compare_to", scope = FunctionTemplate.FunctionScope.SIMPLE)
+  public static class Long implements DrillFunc {
+
+      @Param BigIntHolder left;
+      @Param BigIntHolder right;
+      @Output IntHolder out;
+
+      public void setup(RecordBatch b) {}
+
+      public void eval() {
+        out.value = left.value < right.value ? -1 : ((left.value == right.value)? 0 : 1);
+      }
+  }
+  public static final FunctionDefinition COMPARE_TO = FunctionDefinition.simple("compare_to", new ArgumentValidators.AllowedTypeList(2, Types.required(MinorType.INT)), new OutputTypeDeterminer.FixedType(Types.required(MinorType.INT)));
+  public static class Provider implements CallProvider{
+
+    @Override
+    public FunctionDefinition[] getFunctionDefintions() {
+      return new FunctionDefinition[]{
+          COMPARE_TO
+      };
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 64f3eb4..bbe4cfb 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -19,16 +19,13 @@ package org.apache.drill.exec.ops;
 
 import java.io.IOException;
 
-import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.exec.compile.ClassTransformer;
 import org.apache.drill.exec.compile.QueryClassLoader;
-import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.metrics.SingleThreadNestedCounter;
-import org.apache.drill.exec.physical.impl.FilteringRecordBatchTransformer;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
@@ -106,7 +103,7 @@ public class FragmentContext {
   public <T> T getImplementationClass(CodeGenerator<T> cg) throws ClassTransformationException, IOException{
     long t1 = System.nanoTime();
     T t= transformer.getImplementationClass(this.loader, cg.getDefinition(), cg.generate(), cg.getMaterializedClassName());
-    logger.debug("Compile time: {} micros.", (System.nanoTime() - t1)/1000/1000 );
+    logger.debug("Compile time: {} millis.", (System.nanoTime() - t1)/1000/1000 );
     return t;
     
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java
index 71513c7..77d3f69 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java
@@ -41,4 +41,5 @@ public interface Sender extends FragmentRoot {
    */
   @JsonProperty("receiver-major-fragment")
   public int getOppositeMajorFragmentId();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
deleted file mode 100644
index acabe30..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.physical.impl;
-
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.ValueVector;
-
-public abstract class FilterRecordBatch implements RecordBatch {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
-
-  private RecordBatch incoming;
-  private SelectionVector2 selectionVector;
-  private BatchSchema schema;
-  private FilteringRecordBatchTransformer transformer;
-  private int outstanding;
-
-  public FilterRecordBatch(RecordBatch batch) {
-    this.incoming = batch;
-  }
-
-  @Override
-  public FragmentContext getContext() {
-    return incoming.getContext();
-  }
-
-  @Override
-  public BatchSchema getSchema() {
-    return schema;
-  }
-
-  @Override
-  public int getRecordCount() {
-    return 0;
-  }
-
-  @Override
-  public void kill() {
-    incoming.kill();
-  }
-
-
-  @Override
-  public SelectionVector2 getSelectionVector2() {
-    return null;
-  }
-
-  @Override
-  public SelectionVector4 getSelectionVector4() {
-    return null;
-  }
-
-  @Override
-  public TypedFieldId getValueVectorId(SchemaPath path) {
-    return null;
-  }
-
-  @Override
-  public <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> vvClass) {
-    return null;
-  }
-
-  public WritableBatch getWritableBatch() {
-    return null;
-  }
-
-  abstract int applyFilter(SelectionVector2 vector, int count);
-
-  /**
-   * Release all assets.
-   */
-  private void close() {
-
-  }
-
-  @Override
-  public IterOutcome next() {
-    while (true) {
-      IterOutcome o = incoming.next();
-      switch (o) {
-      case OK_NEW_SCHEMA:
-        transformer = null;
-        schema = transformer.getSchema();
-        // fall through to ok.
-      case OK:
-
-      case NONE:
-      case STOP:
-        close();
-        return IterOutcome.STOP;
-      }
-
-      if (outstanding > 0) {
-        // move data to output location.
-
-        for (int i = incoming.getRecordCount() - outstanding; i < incoming.getRecordCount(); i++) {
-
-        }
-      }
-
-//      // make sure the bit vector is as large as the current record batch.
-//      if (selectionVector.capacity() < incoming.getRecordCount()) {
-//        selectionVector.allocateNew(incoming.getRecordCount());
-//      }
-
-      return null;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilteringRecordBatchTransformer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilteringRecordBatchTransformer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilteringRecordBatchTransformer.java
deleted file mode 100644
index 0813481..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilteringRecordBatchTransformer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.physical.impl;
-
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-
-public abstract class FilteringRecordBatchTransformer {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilteringRecordBatchTransformer.class);
-  
-  final RecordBatch incoming;
-  final SelectionVector2 selectionVector;
-  final BatchSchema schema;
-  
-  public FilteringRecordBatchTransformer(RecordBatch incoming, OutputMutator output, SelectionVector2 selectionVector) {
-    super();
-    this.incoming = incoming;
-    this.selectionVector = selectionVector;
-    this.schema = innerSetup();
-  }
-
-  public abstract BatchSchema innerSetup();
-  
-  /**
-   * Applies the filter to the selection index.  Ignores any values in the selection vector, instead creating a.
-   * @return
-   */
-  public abstract int apply();
-  
-  /**
-   * Applies the filter to the selection index.  Utilizes the existing selection index and only evaluates on those records.
-   * @return
-   */
-  public abstract int applyWithSelection();
-
-  public BatchSchema getSchema() {
-    return schema;
-  }
-  
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 7740955..c31e9e4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -34,8 +34,10 @@ import org.apache.drill.exec.physical.config.RandomReceiver;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.physical.config.SingleSender;
+import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
 import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
+import org.apache.drill.exec.physical.impl.sort.SortBatchCreator;
 import org.apache.drill.exec.physical.impl.svremover.SVRemoverCreator;
 import org.apache.drill.exec.record.RecordBatch;
 
@@ -52,6 +54,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
   private ProjectBatchCreator pbc = new ProjectBatchCreator();
   private FilterBatchCreator fbc = new FilterBatchCreator();
   private SVRemoverCreator svc = new SVRemoverCreator();
+  private SortBatchCreator sbc = new SortBatchCreator();
   private RootExec root = null;
   
   private ImplCreator(){}
@@ -88,6 +91,12 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
     }
   }
 
+  
+  @Override
+  public RecordBatch visitSort(Sort sort, FragmentContext context) throws ExecutionSetupException {
+    return sbc.getBatch(context, sort, getChildren(sort, context));
+  }
+
   @Override
   public RecordBatch visitScreen(Screen op, FragmentContext context) throws ExecutionSetupException {
     Preconditions.checkArgument(root == null);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
index 3f8aac7..9bcfe10 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
@@ -17,7 +17,6 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.impl;
 
-import org.apache.drill.exec.exception.FragmentSetupException;
 
 /**
  * A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index a8a62ca..5a543b0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.physical.impl;
 
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -29,13 +28,15 @@ import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SchemaBuilder;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.ValueVector;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
@@ -44,10 +45,9 @@ import com.google.common.collect.Maps;
 public class ScanBatch implements RecordBatch {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
 
-  final List<ValueVector> vectors = Lists.newLinkedList();
   final Map<MaterializedField, ValueVector> fieldVectorMap = Maps.newHashMap();
 
-  private VectorHolder holder = new VectorHolder(vectors);
+  private VectorContainer holder = new VectorContainer();
   private BatchSchema schema;
   private int recordCount;
   private boolean schemaChanged = true;
@@ -91,9 +91,7 @@ public class ScanBatch implements RecordBatch {
   }
 
   private void releaseAssets() {
-    for (ValueVector v : vectors) {
-      v.close();
-    }
+    holder.clear();
   }
 
   @Override
@@ -139,10 +137,12 @@ public class ScanBatch implements RecordBatch {
   }
 
   @Override
-  public <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> clazz) {
-    return holder.getValueVector(fieldId, clazz);
+  public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
+    return holder.getVectorAccessor(fieldId, clazz);
   }
 
+
+
   private class Mutator implements OutputMutator {
     private SchemaBuilder builder = BatchSchema.newBuilder();
 
@@ -150,12 +150,12 @@ public class ScanBatch implements RecordBatch {
       schemaChanged();
       ValueVector vector = fieldVectorMap.remove(field);
       if (vector == null) throw new SchemaChangeException("Failure attempting to remove an unknown field.");
-      vectors.remove(vector);
+      holder.remove(vector);
       vector.close();
     }
 
     public void addField(ValueVector vector) {
-      vectors.add(vector);
+      holder.add(vector);
       fieldVectorMap.put(vector.getField(), vector);
       builder.addField(vector.getField());
     }
@@ -169,8 +169,8 @@ public class ScanBatch implements RecordBatch {
   }
 
   @Override
-  public Iterator<ValueVector> iterator() {
-    return vectors.iterator();
+  public Iterator<VectorWrapper<?>> iterator() {
+    return holder.iterator();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 516b5af..5c5e2e5 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -17,8 +17,6 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.impl;
 
-import io.netty.buffer.ByteBuf;
-
 import java.util.List;
 
 import org.apache.drill.exec.ops.FragmentContext;
@@ -27,7 +25,6 @@ import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
 import org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.proto.UserProtos.QueryResult;
 import org.apache.drill.exec.record.RecordBatch;
@@ -45,6 +42,7 @@ public class ScreenCreator implements RootCreator<Screen>{
   
   @Override
   public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) {
+    Preconditions.checkNotNull(children);
     Preconditions.checkArgument(children.size() == 1);
     return new ScreenRoot(context, children.iterator().next());
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 231bc6c..bad3a03 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -76,8 +76,8 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
         tunnel.sendRecordBatch(new RecordSendFailure(), context, b2);
         return false;
 
-      case OK:
       case OK_NEW_SCHEMA:
+      case OK:
         FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
         tunnel.sendRecordBatch(new RecordSendFailure(), context, batch);
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/VectorHolder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/VectorHolder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/VectorHolder.java
deleted file mode 100644
index 65a7365..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/VectorHolder.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package org.apache.drill.exec.physical.impl;
-
-import java.util.List;
-
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
-import org.apache.drill.exec.vector.ValueVector;
-
-public class VectorHolder {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorHolder.class);
-  
-  private List<ValueVector> vectors;
-
-  public VectorHolder(List<ValueVector> vectors) {
-    super();
-    this.vectors = vectors;
-  }
-  
-  public TypedFieldId getValueVector(SchemaPath path) {
-    for(int i =0; i < vectors.size(); i++){
-      ValueVector vv = vectors.get(i);
-      if(vv.getField().matches(path)) return new TypedFieldId(vv.getField().getType(), i); 
-    }
-    return null;
-  }
-  
-  @SuppressWarnings("unchecked")
-  public <T extends ValueVector> T getValueVector(int fieldId, Class<?> clazz) {
-    ValueVector v = vectors.get(fieldId);
-    assert v != null;
-    if (v.getClass() != clazz){
-      logger.warn(String.format(
-          "Failure while reading vector.  Expected vector class of %s but was holding vector class %s.",
-          clazz.getCanonicalName(), v.getClass().getCanonicalName()));
-      return null;
-    }
-    return (T) v;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index a575f69..93f643d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -24,11 +24,12 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.InvalidValueAccessor;
 import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.record.RawFragmentBatchProvider;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -70,7 +71,7 @@ public class WireRecordBatch implements RecordBatch{
   }
   
   @Override
-  public Iterator<ValueVector> iterator() {
+  public Iterator<VectorWrapper<?>> iterator() {
     return batchLoader.iterator();
   }
 
@@ -86,14 +87,14 @@ public class WireRecordBatch implements RecordBatch{
 
   @Override
   public TypedFieldId getValueVectorId(SchemaPath path) {
-    return batchLoader.getValueVector(path);
+    return batchLoader.getValueVectorId(path);
   }
-
+  
   @Override
-  public <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> clazz){
-    return batchLoader.getValueVector(fieldId, clazz);
+  public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
+    return batchLoader.getValueAccessorById(fieldId, clazz);
   }
-  
+
   @Override
   public IterOutcome next() {
     RawFragmentBatch batch = fragProvider.getNext();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index babc66e..806ead6 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -1,144 +1,68 @@
 package org.apache.drill.exec.physical.impl.filter;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Filter;
-import org.apache.drill.exec.physical.impl.VectorHolder;
-import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.SchemaBuilder;
 import org.apache.drill.exec.record.TransferPair;
-import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.ValueVector;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-public class FilterRecordBatch implements RecordBatch{
+public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
 
-  private final Filter filterConfig;
-  private final RecordBatch incoming;
-  private final FragmentContext context;
   private final SelectionVector2 sv;
-  private BatchSchema outSchema;
   private Filterer filter;
-  private List<ValueVector> outputVectors;
-  private VectorHolder vh;
   
   public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context){
-    this.filterConfig = pop;
-    this.incoming = incoming;
-    this.context = context;
+    super(pop, context, incoming);
     sv = new SelectionVector2(context.getAllocator());
   }
   
-  
   @Override
   public FragmentContext getContext() {
     return context;
   }
 
   @Override
-  public BatchSchema getSchema() {
-    Preconditions.checkNotNull(outSchema);
-    return outSchema;
-  }
-
-  @Override
   public int getRecordCount() {
     return sv.getCount();
   }
 
   @Override
-  public void kill() {
-    incoming.kill();
-  }
-
-  
-  @Override
-  public Iterator<ValueVector> iterator() {
-    return outputVectors.iterator();
-  }
-
-  @Override
   public SelectionVector2 getSelectionVector2() {
     return sv;
   }
 
   @Override
-  public SelectionVector4 getSelectionVector4() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public TypedFieldId getValueVectorId(SchemaPath path) {
-    return vh.getValueVector(path);
-  }
-
-  @Override
-  public <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> clazz) {
-    return vh.getValueVector(fieldId, clazz);
-  }
-
-  @Override
-  public IterOutcome next() {
-    
-    IterOutcome upstream = incoming.next();
-    logger.debug("Upstream... {}", upstream);
-    switch(upstream){
-    case NONE:
-    case NOT_YET:
-    case STOP:
-      return upstream;
-    case OK_NEW_SCHEMA:
-      try{
-        filter = createNewFilterer();
-      }catch(SchemaChangeException ex){
-        incoming.kill();
-        logger.error("Failure during query", ex);
-        context.fail(ex);
-        return IterOutcome.STOP;
-      }
-      // fall through.
-    case OK:
-      int recordCount = incoming.getRecordCount();
-      sv.allocateNew(recordCount);
-      filter.filterBatch(recordCount);
-      for(ValueVector v : this.outputVectors){
-        ValueVector.Mutator m = v.getMutator();
-        m.setValueCount(recordCount);
-      }
-      return upstream; // change if upstream changed, otherwise normal.
-    default:
-      throw new UnsupportedOperationException();
+  protected void doWork() {
+    int recordCount = incoming.getRecordCount();
+    sv.allocateNew(recordCount);
+    filter.filterBatch(recordCount);
+    for(VectorWrapper<?> v : container){
+      ValueVector.Mutator m = v.getValueVector().getMutator();
+      m.setValueCount(recordCount);
     }
   }
   
-
-  private Filterer createNewFilterer() throws SchemaChangeException{
-    if(outputVectors != null){
-      for(ValueVector v : outputVectors){
-        v.close();
-      }
-    }
-    this.outputVectors = Lists.newArrayList();
-    this.vh = new VectorHolder(outputVectors);
-    LogicalExpression filterExpression = filterConfig.getExpr();
+  @Override
+  protected void setupNewSchema() throws SchemaChangeException {
+    container.clear();
+    LogicalExpression filterExpression = popConfig.getExpr();
     final ErrorCollector collector = new ErrorCollectorImpl();
     final List<TransferPair> transfers = Lists.newArrayList();
     final CodeGenerator<Filterer> cg = new CodeGenerator<Filterer>(Filterer.TEMPLATE_DEFINITION, context.getFunctionRegistry());
@@ -150,32 +74,21 @@ public class FilterRecordBatch implements RecordBatch{
     
     cg.addExpr(new ReturnValueExpression(expr));
     
-    for(ValueVector v : incoming){
-      TransferPair pair = v.getTransferPair();
-      outputVectors.add(pair.getTo());
+    for(VectorWrapper<?> v : incoming){
+      TransferPair pair = v.getValueVector().getTransferPair();
+      container.add(pair.getTo());
       transfers.add(pair);
     }
     
-    SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(SelectionVectorMode.TWO_BYTE);
-    for(ValueVector v : outputVectors){
-      bldr.addField(v.getField());
-    }
-    this.outSchema = bldr.build();
+    container.buildSchema(SelectionVectorMode.TWO_BYTE);
     
     try {
       TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]);
-      Filterer filterer = context.getImplementationClass(cg);
-      filterer.setup(context, incoming, this, tx);
-      return filterer;
+      this.filter = context.getImplementationClass(cg);
+      filter.setup(context, incoming, this, tx);
     } catch (ClassTransformationException | IOException e) {
       throw new SchemaChangeException("Failure while attempting to load generated class", e);
     }
   }
   
-  @Override
-  public WritableBatch getWritableBatch() {
-    return WritableBatch.get(this);
-  }
-  
-  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
index 17c65e9..b62e52b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
@@ -17,12 +17,6 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.impl.materialize;
 
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserProtos.QueryResult;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.WritableBatch;
 
 public interface RecordMaterializer {
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 29660cb..8f06290 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -1,7 +1,6 @@
 package org.apache.drill.exec.physical.impl.project;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.common.expression.ErrorCollector;
@@ -20,19 +19,15 @@ import org.apache.drill.exec.expr.ValueVectorReadExpression;
 import org.apache.drill.exec.expr.ValueVectorWriteExpression;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.physical.impl.VectorHolder;
 import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
 import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
 import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
-import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.SchemaBuilder;
 import org.apache.drill.exec.record.TransferPair;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.TypeHelper;
 import org.apache.drill.exec.vector.ValueVector;
@@ -40,118 +35,39 @@ import org.apache.drill.exec.vector.ValueVector;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-public class ProjectRecordBatch implements RecordBatch{
+public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
 
-  private final Project pop;
-  private final RecordBatch incoming;
-  private final FragmentContext context;
-  private BatchSchema outSchema;
   private Projector projector;
   private List<ValueVector> allocationVectors;
-  private List<ValueVector> outputVectors;
-  private VectorHolder vh;
-  
   
   public ProjectRecordBatch(Project pop, RecordBatch incoming, FragmentContext context){
-    this.pop = pop;
-    this.incoming = incoming;
-    this.context = context;
+    super(pop, context, incoming);
   }
   
   @Override
-  public Iterator<ValueVector> iterator() {
-    return outputVectors.iterator();
-  }
-
-  @Override
-  public FragmentContext getContext() {
-    return context;
-  }
-
-  @Override
-  public BatchSchema getSchema() {
-    Preconditions.checkNotNull(outSchema);
-    return outSchema;
-  }
-
-  @Override
   public int getRecordCount() {
     return incoming.getRecordCount();
   }
 
   @Override
-  public void kill() {
-    incoming.kill();
-  }
-
-  @Override
-  public SelectionVector2 getSelectionVector2() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public SelectionVector4 getSelectionVector4() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public TypedFieldId getValueVectorId(SchemaPath path) {
-    return vh.getValueVector(path);
-  }
-
-  @Override
-  public <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> clazz) {
-    return vh.getValueVector(fieldId, clazz);
-  }
-
-  @Override
-  public IterOutcome next() {
-    
-    IterOutcome upstream = incoming.next();
-    logger.debug("Upstream... {}", upstream);
-    switch(upstream){
-    case NONE:
-    case NOT_YET:
-    case STOP:
-      return upstream;
-    case OK_NEW_SCHEMA:
-      try{
-        projector = createNewProjector();
-      }catch(SchemaChangeException ex){
-        incoming.kill();
-        logger.error("Failure during query", ex);
-        context.fail(ex);
-        return IterOutcome.STOP;
-      }
-      // fall through.
-    case OK:
-      int recordCount = incoming.getRecordCount();
-      for(ValueVector v : this.allocationVectors){
-        AllocationHelper.allocate(v, recordCount, 50);
-      }
-      projector.projectRecords(recordCount, 0);
-      for(ValueVector v : this.outputVectors){
-        ValueVector.Mutator m = v.getMutator();
-        m.setValueCount(recordCount);
-      }
-      return upstream; // change if upstream changed, otherwise normal.
-    default:
-      throw new UnsupportedOperationException();
+  protected void doWork() {
+    int recordCount = incoming.getRecordCount();
+    for(ValueVector v : this.allocationVectors){
+      AllocationHelper.allocate(v, recordCount, 50);
+    }
+    projector.projectRecords(recordCount, 0);
+    for(VectorWrapper<?> v : container){
+      ValueVector.Mutator m = v.getValueVector().getMutator();
+      m.setValueCount(recordCount);
     }
   }
-  
 
-  private Projector createNewProjector() throws SchemaChangeException{
+  @Override
+  protected void setupNewSchema() throws SchemaChangeException{
     this.allocationVectors = Lists.newArrayList();
-    if(outputVectors != null){
-      for(ValueVector v : outputVectors){
-        v.close();
-      }
-    }
-    this.outputVectors = Lists.newArrayList();
-    this.vh = new VectorHolder(outputVectors);
-    final List<NamedExpression> exprs = pop.getExprs();
+    container.clear();
+    final List<NamedExpression> exprs = popConfig.getExprs();
     final ErrorCollector collector = new ErrorCollectorImpl();
     final List<TransferPair> transfers = Lists.newArrayList();
     
@@ -168,43 +84,34 @@ public class ProjectRecordBatch implements RecordBatch{
       // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
       if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE){
         ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
-        ValueVector vvIn = incoming.getValueVectorById(vectorRead.getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode()));
+        ValueVector vvIn = incoming.getValueAccessorById(vectorRead.getFieldId().getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode())).getValueVector();
         Preconditions.checkNotNull(incoming);
 
         TransferPair tp = vvIn.getTransferPair();
         transfers.add(tp);
-        outputVectors.add(tp.getTo());
+        container.add(tp.getTo());
+        logger.debug("Added transfer.");
       }else{
         // need to do evaluation.
         ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator());
         allocationVectors.add(vector);
-        outputVectors.add(vector);
-        ValueVectorWriteExpression write = new ValueVectorWriteExpression(outputVectors.size() - 1, expr);
+        ValueVectorWriteExpression write = new ValueVectorWriteExpression(container.add(vector), expr);
         cg.addExpr(write);
+        logger.debug("Added eval.");
       }
       
     }
     
-    SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(incoming.getSchema().getSelectionVectorMode());
-    for(ValueVector v : outputVectors){
-      bldr.addField(v.getField());
-    }
-    this.outSchema = bldr.build();
+    container.buildSchema(incoming.getSchema().getSelectionVectorMode());
     
     try {
-      Projector projector = context.getImplementationClass(cg);
+      this.projector = context.getImplementationClass(cg);
       projector.setup(context, incoming, this, transfers);
-      return projector;
     } catch (ClassTransformationException | IOException e) {
       throw new SchemaChangeException("Failure while attempting to load generated class", e);
     }
   }
   
-  @Override
-  public WritableBatch getWritableBatch() {
-    return WritableBatch.get(this);
-  }
-  
   private MaterializedField getMaterializedField(FieldReference reference, LogicalExpression expr){
     return new MaterializedField(getFieldDef(reference, expr.getMajorType()));
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Comparator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Comparator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Comparator.java
new file mode 100644
index 0000000..86d0d61
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Comparator.java
@@ -0,0 +1,11 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+
+public interface Comparator {
+  
+  public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+  public abstract int doEval(int inIndex, int outIndex);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/ReadIndexRewriter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/ReadIndexRewriter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/ReadIndexRewriter.java
new file mode 100644
index 0000000..83d43b2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/ReadIndexRewriter.java
@@ -0,0 +1,87 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.IfExpression;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
+import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.visitors.ExprVisitor;
+import org.apache.drill.exec.expr.ValueVectorReadExpression;
+
+import com.google.common.collect.Lists;
+
+public class ReadIndexRewriter implements ExprVisitor<LogicalExpression, String, RuntimeException> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReadIndexRewriter.class);
+
+  private String batchName;
+  
+  
+  @Override
+  public LogicalExpression visitUnknown(LogicalExpression e, String newIndexName) {
+    if (e instanceof ValueVectorReadExpression) {
+      ValueVectorReadExpression old = (ValueVectorReadExpression) e;
+      return new ValueVectorReadExpression(old.getTypedFieldId(), newIndexName);
+    } else {
+      throw new UnsupportedOperationException(String.format(
+          "ReadIndex rewriter doesn't know how to rewrite expression of type %s.", e.getClass().getName()));
+    }
+  }
+
+  @Override
+  public LogicalExpression visitFunctionCall(FunctionCall call, String newIndexName) {
+    List<LogicalExpression> args = Lists.newArrayList();
+    for (int i = 0; i < call.args.size(); ++i) {
+      LogicalExpression newExpr = call.args.get(i).accept(this, null);
+      args.add(newExpr);
+    }
+
+    return new FunctionCall(call.getDefinition(), args, call.getPosition());
+  }
+
+  @Override
+  public LogicalExpression visitIfExpression(IfExpression ifExpr, String newIndexName) {
+    List<IfExpression.IfCondition> conditions = Lists.newArrayList(ifExpr.iterator());
+    LogicalExpression newElseExpr = ifExpr.elseExpression.accept(this, null);
+
+    for (int i = 0; i < conditions.size(); ++i) {
+      IfExpression.IfCondition condition = conditions.get(i);
+
+      LogicalExpression newCondition = condition.condition.accept(this, null);
+      LogicalExpression newExpr = condition.expression.accept(this, null);
+      conditions.set(i, new IfExpression.IfCondition(newCondition, newExpr));
+    }
+
+    return IfExpression.newBuilder().setElse(newElseExpr).addConditions(conditions).build();
+  }
+
+  @Override
+  public LogicalExpression visitSchemaPath(SchemaPath path, String newIndexName) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public LogicalExpression visitLongConstant(LongExpression intExpr, String value) throws RuntimeException {
+    return intExpr;
+  }
+
+  @Override
+  public LogicalExpression visitDoubleConstant(DoubleExpression dExpr, String value) throws RuntimeException {
+    return dExpr;
+  }
+
+  @Override
+  public LogicalExpression visitBooleanConstant(BooleanExpression e, String value) throws RuntimeException {
+    return e;
+  }
+
+  @Override
+  public LogicalExpression visitQuotedStringConstant(QuotedString e, String value) throws RuntimeException {
+    return e;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
new file mode 100644
index 0000000..a21af09
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
@@ -0,0 +1,47 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import java.util.List;
+
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Holds the data for a particular record batch for later manipulation.
+ */
+class RecordBatchData {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchData.class);
+  
+  final List<ValueVector> vectors = Lists.newArrayList();
+  final SelectionVector2 sv2;
+  final int recordCount;
+  
+  public RecordBatchData(RecordBatch batch){
+    this.sv2 = batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE ? batch.getSelectionVector2().clone() : null;
+    
+    for(VectorWrapper<?> v : batch){
+      if(v.isHyper()) throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch.");
+      TransferPair tp = v.getValueVector().getTransferPair();
+      tp.transfer();
+      vectors.add(tp.getTo());
+    }
+    
+    recordCount = batch.getRecordCount();
+  }
+  
+  public int getRecordCount(){
+    return recordCount;
+  }
+  public List<ValueVector> getVectors() {
+    return vectors;
+  }
+
+  public SelectionVector2 getSv2() {
+    return sv2;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
new file mode 100644
index 0000000..e361e38
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -0,0 +1,171 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import java.io.IOException;
+
+import org.apache.drill.common.defs.OrderDef;
+import org.apache.drill.common.defs.OrderDef.Direction;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.CodeGenerator.HoldingContainer;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.fn.impl.ComparatorFunctions;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+import com.google.common.collect.ImmutableList;
+import com.sun.codemodel.JConditional;
+import com.sun.codemodel.JExpr;
+
+public class SortBatch extends AbstractRecordBatch<Sort> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortBatch.class);
+
+  private static long MAX_SORT_BYTES = 8l * 1024 * 1024 * 1024;
+
+  private final RecordBatch incoming;
+  private SortRecordBatchBuilder builder;
+  private SelectionVector4 sv4;
+  private Sorter sorter;
+  private BatchSchema schema;
+  
+  public SortBatch(Sort popConfig, FragmentContext context, RecordBatch incoming) {
+    super(popConfig, context);
+    this.incoming = incoming;
+  }
+
+  @Override
+  public int getRecordCount() {
+    return sv4.getLength();
+  }
+
+  @Override
+  public void kill() {
+    incoming.kill();
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    return sv4;
+  }
+
+  @Override
+  public IterOutcome next() {
+    if(builder != null){
+      if(sv4.next()){
+        return IterOutcome.OK;
+      }else{
+        return IterOutcome.NONE;
+      }
+    }
+    
+    
+    try{
+      outer: while (true) {
+        IterOutcome upstream = incoming.next();
+        switch (upstream) {
+        case NONE:
+          break outer;
+        case NOT_YET:
+        case STOP:
+          container.clear();
+          return upstream;
+        case OK_NEW_SCHEMA:
+          // only change in the case that the schema truly changes.  Artificial schema changes are ignored.
+          if(!incoming.getSchema().equals(schema)){
+            if (builder != null) throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+            builder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES, container);
+            this.schema = incoming.getSchema();
+          }
+          // fall through.
+        case OK:
+          if(!builder.add(incoming)){
+            throw new UnsupportedOperationException("Sort doesn't currently support doing an external sort.");
+          };
+          break;
+        default:
+          throw new UnsupportedOperationException();
+        }
+      }
+      
+      builder.build(context);
+      sv4 = builder.getSv4();
+
+      sorter = createNewSorter();
+      sorter.setup(context, this);
+      long t1 = System.nanoTime();
+      sorter.sort(sv4, container);
+      logger.debug("Sorted {} records in {} micros.", sv4.getTotalCount(), (System.nanoTime() - t1)/1000);
+      
+      return IterOutcome.OK_NEW_SCHEMA;
+      
+    }catch(SchemaChangeException | ClassTransformationException | IOException ex){
+      kill();
+      logger.error("Failure during query", ex);
+      context.fail(ex);
+      return IterOutcome.STOP;
+    }
+  }
+
+  
+  private Sorter createNewSorter() throws ClassTransformationException, IOException, SchemaChangeException{
+    CodeGenerator<Sorter> g = new CodeGenerator<Sorter>(Sorter.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+    
+    for(OrderDef od : popConfig.getOrderings()){
+      // first, we rewrite the evaluation stack for each side of the comparison.
+      ErrorCollector collector = new ErrorCollectorImpl(); 
+      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), this, collector);
+      if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+      ReadIndexRewriter rewriter = new ReadIndexRewriter();
+      LogicalExpression left = expr.accept(rewriter, "inIndex");
+      LogicalExpression right = expr.accept(rewriter, "outIndex");
+      
+      // next we wrap the two comparison sides and add the expression block for the comparison.
+      FunctionCall f = new FunctionCall(ComparatorFunctions.COMPARE_TO, ImmutableList.of(left, right), ExpressionPosition.UNKNOWN);
+      HoldingContainer out = g.addExpr(f);
+      JConditional jc = g.getBlock()._if(out.getValue().ne(JExpr.lit(0)));
+      
+      //TODO: is this the right order...
+      if(od.getDirection() == Direction.ASC){
+        jc._then()._return(out.getValue());
+      }else{
+        jc._then()._return(out.getValue().minus());
+      }
+    }
+    
+    g.getBlock()._return(JExpr.lit(0));
+    
+    return context.getImplementationClass(g);
+
+
+  }
+  
+  @Override
+  public WritableBatch getWritableBatch() {
+    throw new UnsupportedOperationException("A sort batch is not writable.");
+  }
+
+  @Override
+  protected void killIncoming() {
+    incoming.kill();
+  }
+
+  
+  
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
new file mode 100644
index 0000000..0a671a8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
@@ -0,0 +1,23 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.base.Preconditions;
+
+public class SortBatchCreator implements BatchCreator<Sort>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortBatchCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, Sort config, List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.size() == 1);
+    return new SortBatch(config, context, children.iterator().next());
+  }
+  
+  
+}