You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:58:02 UTC

[41/53] [abbrv] Types transition

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
index 0f4619c..7b76d05 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
@@ -18,15 +18,18 @@
 package org.apache.drill.exec.physical.config;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.MockScanPOP.MockColumn;
 import org.apache.drill.exec.physical.config.MockScanPOP.MockScanEntry;
 import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.proto.SchemaDefProtos.DataMode;
-import org.apache.drill.exec.proto.SchemaDefProtos.MajorType;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.vector.TypeHelper;
+import org.apache.drill.exec.record.vector.ValueVector;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.NonRepeatedMutator;
@@ -57,12 +60,12 @@ public class MockRecordReader implements RecordReader {
     return x;
   }
 
-  private ValueVector getVector(int fieldId, String name, MajorType type, int length) {
+  private ValueVector<?> getVector(String name, MajorType type, int length) {
     assert context != null : "Context shouldn't be null.";
     if(type.getMode() != DataMode.REQUIRED) throw new UnsupportedOperationException();
     
-    MaterializedField f = MaterializedField.create(new SchemaPath(name), fieldId, 0, type);
-    ValueVector v;
+    MaterializedField f = MaterializedField.create(new SchemaPath(name, ExpressionPosition.UNKNOWN), type);
+    ValueVector<?> v;
     v = TypeHelper.getNewVector(f, context.getAllocator());
     if(v instanceof FixedWidthVector){
       ((FixedWidthVector)v).allocateNew(length);  
@@ -85,8 +88,8 @@ public class MockRecordReader implements RecordReader {
       batchRecordCount = 250000 / estimateRowSize;
 
       for (int i = 0; i < config.getTypes().length; i++) {
-        valueVectors[i] = getVector(i, config.getTypes()[i].getName(), config.getTypes()[i].getMajorType(), batchRecordCount);
-        output.addField(i, valueVectors[i]);
+        valueVectors[i] = getVector(config.getTypes()[i].getName(), config.getTypes()[i].getMajorType(), batchRecordCount);
+        output.addField(valueVectors[i]);
       }
       output.setNewSchema();
     } catch (SchemaChangeException e) {
@@ -128,7 +131,7 @@ public class MockRecordReader implements RecordReader {
   public void cleanup() {
     for (int i = 0; i < valueVectors.length; i++) {
       try {
-        output.removeField(valueVectors[i].getField().getFieldId());
+        output.removeField(valueVectors[i].getField());
       } catch (SchemaChangeException e) {
         logger.warn("Failure while trying to remove field.", e);
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java
index 40227e5..3802ce2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java
@@ -21,6 +21,9 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.ReadEntry;
@@ -29,10 +32,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Scan;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.SchemaDefProtos.DataMode;
-import org.apache.drill.exec.proto.SchemaDefProtos.MajorType;
-import org.apache.drill.exec.proto.SchemaDefProtos.MinorType;
-import org.apache.drill.exec.vector.TypeHelper;
+import org.apache.drill.exec.record.vector.TypeHelper;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
index 6440d98..80d48f4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
@@ -17,12 +17,21 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.impl;
 
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.InvalidValueAccessor;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
+import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.vector.SelectionVector;
+<<<<<<< HEAD
 import org.apache.drill.exec.vector.ValueVector;
+=======
+import org.apache.drill.exec.record.vector.SelectionVector2;
+import org.apache.drill.exec.record.vector.SelectionVector4;
+import org.apache.drill.exec.record.vector.ValueVector;
+>>>>>>> Build working
 
 public abstract class FilterRecordBatch implements RecordBatch {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
@@ -57,8 +66,33 @@ public abstract class FilterRecordBatch implements RecordBatch {
     incoming.kill();
   }
 
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    return null;
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    return null;
+  }
+
+  @Override
+  public TypedFieldId getValueVector(SchemaPath path) {
+    return null;
+  }
+
+  @Override
+  public <T extends ValueVector<T>> T getValueVectorById(int fieldId, Class<?> vvClass) {
+    return null;
+  }
+
   @Override
+<<<<<<< HEAD
   public <T extends ValueVector> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
+=======
+  public WritableBatch getWritableBatch() {
+>>>>>>> Build working
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
index b3b9f5f..5247d08 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
@@ -18,10 +18,19 @@
 package org.apache.drill.exec.physical.impl;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
+<<<<<<< HEAD
 import org.apache.drill.exec.vector.ValueVector;
 
 public interface OutputMutator {
   public void removeField(int fieldId) throws SchemaChangeException;
   public void addField(int fieldId, ValueVector vector) throws SchemaChangeException ;
+=======
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+public interface OutputMutator {
+  public void removeField(MaterializedField field) throws SchemaChangeException;
+  public void addField(ValueVector<?> vector) throws SchemaChangeException ;
+>>>>>>> Build working
   public void setNewSchema() throws SchemaChangeException ;
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 2f3e1fe..1e0c000 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -18,20 +18,37 @@
 package org.apache.drill.exec.physical.impl;
 
 import java.util.Iterator;
+<<<<<<< HEAD
+=======
+import java.util.List;
+import java.util.Map;
+>>>>>>> Build working
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.InvalidValueAccessor;
+import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SchemaBuilder;
 import org.apache.drill.exec.record.WritableBatch;
+<<<<<<< HEAD
+=======
+import org.apache.drill.exec.record.vector.SelectionVector2;
+import org.apache.drill.exec.record.vector.SelectionVector4;
+import org.apache.drill.exec.record.vector.ValueVector;
+>>>>>>> Build working
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.ValueVector;
 
+<<<<<<< HEAD
 import com.carrotsearch.hppc.IntObjectOpenHashMap;
 import com.carrotsearch.hppc.procedures.IntObjectProcedure;
+=======
+import com.beust.jcommander.internal.Lists;
+import com.beust.jcommander.internal.Maps;
+>>>>>>> Build working
 
 /**
  * Record batch used for a particular scan. Operators against one or more
@@ -39,7 +56,14 @@ import com.carrotsearch.hppc.procedures.IntObjectProcedure;
 public class ScanBatch implements RecordBatch {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
 
+<<<<<<< HEAD
   private IntObjectOpenHashMap<ValueVector> fields = new IntObjectOpenHashMap<ValueVector>();
+=======
+  final List<ValueVector<?>> vectors = Lists.newLinkedList();
+  final Map<MaterializedField, ValueVector<?>> fieldVectorMap = Maps.newHashMap();
+  
+  private VectorHolder holder = new VectorHolder(vectors);
+>>>>>>> Build working
   private BatchSchema schema;
   private int recordCount;
   private boolean schemaChanged = true;
@@ -83,6 +107,7 @@ public class ScanBatch implements RecordBatch {
   }
 
   private void releaseAssets() {
+<<<<<<< HEAD
     fields.forEach(new IntObjectProcedure<ValueVector>() {
       @Override
       public void apply(int key, ValueVector value) {
@@ -102,9 +127,13 @@ public class ScanBatch implements RecordBatch {
       throw new InvalidValueAccessor(String.format(
           "You requested a field accessor of type %s for field id %d but the actual type was %s.",
           clazz.getCanonicalName(), fieldId, vector.getClass().getCanonicalName()));
+=======
+    for(ValueVector<?> v : vectors){
+      v.close();
+>>>>>>> Build working
     }
   }
-
+  
   @Override
   public IterOutcome next() {
     while ((recordCount = currentReader.next()) == 0) {
@@ -132,11 +161,34 @@ public class ScanBatch implements RecordBatch {
     }
   }
 
+  
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TypedFieldId getValueVector(SchemaPath path) {
+    return holder.getValueVector(path);
+  }
+
+  @Override
+  public <T extends ValueVector<T>> T getValueVectorById(int fieldId, Class<?> clazz) {
+    return holder.getValueVector(fieldId, clazz);
+  }
+
+
   private class Mutator implements OutputMutator {
     private SchemaBuilder builder = BatchSchema.newBuilder();
     
-    public void removeField(int fieldId) throws SchemaChangeException {
+    public void removeField(MaterializedField field) throws SchemaChangeException {
       schemaChanged();
+<<<<<<< HEAD
       ValueVector v = fields.remove(fieldId);
       if (v == null) throw new SchemaChangeException("Failure attempting to remove an unknown field.");
       v.close();
@@ -146,8 +198,18 @@ public class ScanBatch implements RecordBatch {
       schemaChanged();
       ValueVector v = fields.put(fieldId, vector);
       vector.getField();
+=======
+      ValueVector<?> vector = fieldVectorMap.remove(field);
+      if (vector == null) throw new SchemaChangeException("Failure attempting to remove an unknown field.");
+      vectors.remove(vector);
+      vector.close();
+    }
+
+    public void addField(ValueVector<?> vector) {
+      vectors.add(vector);
+      fieldVectorMap.put(vector.getField(), vector);
+>>>>>>> Build working
       builder.addField(vector.getField());
-      if (v != null) v.close();
     }
 
     @Override
@@ -160,7 +222,7 @@ public class ScanBatch implements RecordBatch {
 
   @Override
   public WritableBatch getWritableBatch() {
-    return WritableBatch.get(this.getRecordCount(), fields);
+    return WritableBatch.get(this.getRecordCount(), vectors);
   }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/VectorHolder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/VectorHolder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/VectorHolder.java
new file mode 100644
index 0000000..4209daa
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/VectorHolder.java
@@ -0,0 +1,39 @@
+package org.apache.drill.exec.physical.impl;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+public class VectorHolder {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorHolder.class);
+  
+  private List<ValueVector<?>> vectors;
+
+  public VectorHolder(List<ValueVector<?>> vectors) {
+    super();
+    this.vectors = vectors;
+  }
+  
+  public TypedFieldId getValueVector(SchemaPath path) {
+    for(int i =0; i < vectors.size(); i++){
+      ValueVector<?> vv = vectors.get(i);
+      if(vv.getField().matches(path)) return new TypedFieldId(vv.getField().getType(), i); 
+    }
+    return null;
+  }
+  
+  @SuppressWarnings("unchecked")
+  public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<?> clazz) {
+    ValueVector<?> v = vectors.get(fieldId);
+    assert v != null;
+    if (v.getClass() != clazz){
+      logger.warn(String.format(
+          "Failure while reading vector.  Expected vector class of %s but was holding vector class %s.",
+          clazz.getCanonicalName(), v.getClass().getCanonicalName()));
+      return null;
+    }
+    return (T) v;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index fcbd272..f4921b5 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -17,17 +17,26 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.impl;
 
+<<<<<<< HEAD
+=======
+import org.apache.drill.common.expression.SchemaPath;
+>>>>>>> Build working
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.InvalidValueAccessor;
 import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.record.RawFragmentBatchProvider;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.WritableBatch;
+<<<<<<< HEAD
 import org.apache.drill.exec.vector.ValueVector;
+=======
+import org.apache.drill.exec.record.vector.SelectionVector2;
+import org.apache.drill.exec.record.vector.SelectionVector4;
+import org.apache.drill.exec.record.vector.ValueVector;
+>>>>>>> Build working
 
 public class WireRecordBatch implements RecordBatch{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WireRecordBatch.class);
@@ -64,11 +73,32 @@ public class WireRecordBatch implements RecordBatch{
     fragProvider.kill(context);
   }
 
+  
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
+  public SelectionVector4 getSelectionVector4() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TypedFieldId getValueVector(SchemaPath path) {
+    return batchLoader.getValueVector(path);
+  }
+
+  @Override
+<<<<<<< HEAD
   public <T extends ValueVector> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
+=======
+  public <T extends ValueVector<T>> T getValueVectorById(int fieldId, Class<?> clazz) {
+>>>>>>> Build working
     return batchLoader.getValueVector(fieldId, clazz);
   }
 
+  
   @Override
   public IterOutcome next() {
     RawFragmentBatch batch = fragProvider.getNext();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvalSetupException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvalSetupException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvalSetupException.java
new file mode 100644
index 0000000..f547989
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvalSetupException.java
@@ -0,0 +1,5 @@
+package org.apache.drill.exec.physical.impl.filter;
+
+public class EvalSetupException extends Exception{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EvalSetupException.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java
new file mode 100644
index 0000000..3176c41
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java
@@ -0,0 +1,13 @@
+package org.apache.drill.exec.physical.impl.filter;
+
+import org.apache.drill.exec.record.vector.SelectionVector2;
+
+public class EvaluationPredicate {
+  private SelectionVector2 vector;
+  
+  EvaluationPredicate(String pred){
+    
+  }
+  
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java
new file mode 100644
index 0000000..158350f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java
@@ -0,0 +1,113 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.physical.impl.filter;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.InvalidValueAccessor;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.vector.SelectionVector2;
+import org.apache.drill.exec.record.vector.SelectionVector4;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+public class ExampleFilter implements RecordBatch {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExampleFilter.class);
+
+  //private EvalutationPredicates []
+  private RecordBatch incoming;
+  private BatchSchema outboundSchema;
+  private int recordCount;
+
+  private void reconfigureSchema() throws SchemaChangeException {
+    BatchSchema in = incoming.getSchema();
+    outboundSchema = BatchSchema.newBuilder().addFields(in).setSelectionVectorMode(BatchSchema.SelectionVectorMode.TWO_BYTE).build();
+  }
+
+  private int generateSelectionVector(){
+                    return -1;
+  }
+
+  @Override
+  public FragmentContext getContext() {
+    return incoming.getContext();
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    return outboundSchema;
+  }
+
+  @Override
+  public int getRecordCount() {
+    return recordCount;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public void kill() {
+    //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    return null;
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    return null;
+  }
+
+  @Override
+  public TypedFieldId getValueVector(SchemaPath path) {
+    return null;
+  }
+
+  @Override
+  public <T extends ValueVector<T>> T getValueVectorById(int fieldId, Class<?> vvClass) {
+    return null;
+  }
+
+  @Override
+  public IterOutcome next() {
+    IterOutcome out = incoming.next();
+    switch (incoming.next()) {
+
+      case NONE:
+        return IterOutcome.NONE;
+      case OK_NEW_SCHEMA:
+        //reconfigureSchema();
+      case OK:
+        this.recordCount = generateSelectionVector();
+        return out;
+      case STOP:
+        return IterOutcome.STOP;
+      default:
+        throw new UnsupportedOperationException();
+    }
+  }
+
+  @Override
+  public WritableBatch getWritableBatch() {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
new file mode 100644
index 0000000..69daae0
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
@@ -0,0 +1,12 @@
+package org.apache.drill.exec.physical.impl.project;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+
+public interface ProjectEvaluator {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectEvaluator.class);
+  
+  public abstract void setupEvaluators(FragmentContext context, RecordBatch incoming) throws SchemaChangeException;
+  public abstract void doPerRecordWork(int inIndex, int outIndex);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
new file mode 100644
index 0000000..cfdb7bc
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -0,0 +1,218 @@
+package org.apache.drill.exec.physical.impl.project;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.ValueVectorReadExpression;
+import org.apache.drill.exec.expr.ValueVectorWriteExpression;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
+import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
+import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.SchemaBuilder;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.vector.SelectionVector2;
+import org.apache.drill.exec.record.vector.SelectionVector4;
+import org.apache.drill.exec.record.vector.TypeHelper;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class ProjectRecordBatch implements RecordBatch{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
+
+  private final Project pop;
+  private final RecordBatch incoming;
+  private final FragmentContext context;
+  private BatchSchema outSchema;
+  private Projector projector;
+  private List<ValueVector<?>> allocationVectors;
+  private List<ValueVector<?>> outputVectors;
+  
+  
+  public ProjectRecordBatch(Project pop, RecordBatch incoming, FragmentContext context){
+    this.pop = pop;
+    this.incoming = incoming;
+    this.context = context;
+  }
+  
+  
+  @Override
+  public FragmentContext getContext() {
+    return context;
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    Preconditions.checkNotNull(outSchema);
+    return outSchema;
+  }
+
+  @Override
+  public int getRecordCount() {
+    return incoming.getRecordCount();
+  }
+
+  @Override
+  public void kill() {
+    incoming.kill();
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TypedFieldId getValueVector(SchemaPath path) {
+    return null;
+  }
+
+  @Override
+  public <T extends ValueVector<T>> T getValueVectorById(int fieldId, Class<?> vvClass) {
+    return null;
+  }
+
+  @Override
+  public IterOutcome next() {
+    
+    IterOutcome upstream = incoming.next();
+    switch(upstream){
+    case NONE:
+    case NOT_YET:
+    case STOP:
+      return upstream;
+    case OK_NEW_SCHEMA:
+      try{
+        projector = createNewProjector();
+      }catch(SchemaChangeException ex){
+        incoming.kill();
+        context.fail(ex);
+        return IterOutcome.STOP;
+      }
+      // fall through.
+    case OK:
+      int recordCount = incoming.getRecordCount();
+      for(ValueVector<?> v : this.allocationVectors){
+        v.allocateNew(recordCount);
+      }
+      projector.projectRecords(recordCount, 0);
+      return upstream; // change if upstream changed, otherwise normal.
+    default:
+      throw new UnsupportedOperationException();
+    }
+  }
+  
+
+  private Projector createNewProjector() throws SchemaChangeException{
+    this.allocationVectors = Lists.newArrayList();
+    if(outputVectors != null){
+      for(ValueVector<?> v : outputVectors){
+        v.close();
+      }
+    }
+    this.outputVectors = Lists.newArrayList();
+    
+    final List<NamedExpression> exprs = pop.getExprs();
+    final ErrorCollector collector = new ErrorCollectorImpl();
+    final List<TransferPairing<?>> transfers = Lists.newArrayList();
+    
+    final CodeGenerator cg = new CodeGenerator("setupEvaluators", "doPerRecordWork", context.getFunctionRegistry());
+    
+    for(int i =0; i < exprs.size(); i++){
+      final NamedExpression namedExpression = exprs.get(i);
+      final MaterializedField outputField = getMaterializedField(namedExpression);
+      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector);
+      if(collector.hasErrors()){
+        throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
+      }
+      
+      
+      
+      
+      // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
+      if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVector() == SelectionVectorMode.NONE){
+        ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
+        ValueVector<?> vvIn = incoming.getValueVectorById(vectorRead.getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType()));
+        Preconditions.checkNotNull(incoming);
+
+        TransferPairing<?> tp = vvIn.getTransferPair(outputField);
+        transfers.add(tp);
+        outputVectors.add(tp.getTo());
+      }else{
+        // need to do evaluation.
+        ValueVector<?> vector = TypeHelper.getNewVector(outputField, context.getAllocator());
+        allocationVectors.add(vector);
+        outputVectors.add(vector);
+        ValueVectorWriteExpression write = new ValueVectorWriteExpression(outputVectors.size() - 1, expr);
+        cg.addNextWrite(write);
+      }
+      
+    }
+    
+    SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(SelectionVectorMode.NONE);
+    for(ValueVector<?> v : outputVectors){
+      bldr.addField(v.getField());
+    }
+    this.outSchema = bldr.build();
+    
+    try {
+      return context.getImplementationClass(Projector.TEMPLATE_DEFINITION, cg);
+    } catch (ClassTransformationException | IOException e) {
+      throw new SchemaChangeException("Failure while attempting to load generated class", e);
+    }
+  }
+  
+  
+  @Override
+  public WritableBatch getWritableBatch() {
+    return null;
+  }
+  
+  
+  private MaterializedField getMaterializedField(NamedExpression ex){
+    return new MaterializedField(getFieldDef(ex.getRef(), ex.getExpr().getMajorType()));
+  }
+
+  private FieldDef getFieldDef(SchemaPath path, MajorType type){
+    return FieldDef //
+        .newBuilder() //
+        .addAllName(getNameParts(path.getRootSegment())) //
+        .setMajorType(type) //
+        .build();
+  }
+  
+  private List<NamePart> getNameParts(PathSegment seg){
+    List<NamePart> parts = Lists.newArrayList();
+    while(seg != null){
+      if(seg.isArray()){
+        parts.add(NamePart.newBuilder().setType(Type.ARRAY).build());
+      }else{
+        parts.add(NamePart.newBuilder().setType(Type.NAME).setName(seg.getNameSegment().getPath().toString()).build());
+      }
+    }
+    return parts;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
new file mode 100644
index 0000000..31c418c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
@@ -0,0 +1,20 @@
+package org.apache.drill.exec.physical.impl.project;
+
+import java.util.List;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+
+public interface Projector {
+
+  public abstract void setup(FragmentContext context, RecordBatch incoming, List<TransferPairing<?>> transfers)  throws SchemaChangeException;
+
+  
+  public abstract void projectRecords(int recordCount, int firstOutputIndex);
+
+  public static TemplateClassDefinition<Projector, Void> TEMPLATE_DEFINITION = new TemplateClassDefinition<Projector, Void>( //
+      Projector.class, "org.apache.drill.exec.physical.impl.project.ProjectTemplate", ProjectEvaluator.class, Void.class);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
new file mode 100644
index 0000000..60af7d2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -0,0 +1,101 @@
+package org.apache.drill.exec.physical.impl.project;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.ValueVectorReadExpression;
+import org.apache.drill.exec.expr.ValueVectorWriteExpression;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
+import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
+import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.vector.SelectionVector2;
+import org.apache.drill.exec.record.vector.SelectionVector4;
+import org.apache.drill.exec.record.vector.TypeHelper;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public abstract class ProjectorTemplate implements Projector {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectorTemplate.class);
+
+  private ImmutableList<TransferPairing<?>> transfers;
+  private SelectionVector2 vector2;
+  private SelectionVector4 vector4;
+  private SelectionVectorMode svMode;
+  
+  public ProjectorTemplate(final FragmentContext context, final RecordBatch incomingBatch, final Project pop, FunctionImplementationRegistry funcRegistry) throws SchemaChangeException{
+    super();
+  }
+
+  @Override
+  public final void projectRecords(final int recordCount, int firstOutputIndex) {
+    switch(svMode){
+    case FOUR_BYTE:
+      throw new UnsupportedOperationException();
+      
+      
+    case TWO_BYTE:
+      final int count = recordCount*2;
+      for(int i = 0; i < count; i+=2, firstOutputIndex++){
+        doPerRecordWork(vector2.getIndex(i), firstOutputIndex);
+      }
+      return;
+      
+      
+    case NONE:
+      
+      for(TransferPairing<?> t : transfers){
+        t.transfer();
+      }
+      final int countN = recordCount;
+      for (int i = 0; i < countN; i++, firstOutputIndex++) {
+        doPerRecordWork(i, firstOutputIndex);
+      }
+      return;
+      
+      
+    default:
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @Override
+  public final void setup(FragmentContext context, RecordBatch incoming, List<TransferPairing<?>> transfers)  throws SchemaChangeException{
+
+    this.svMode = incoming.getSchema().getSelectionVector(); 
+    switch(svMode){
+    case FOUR_BYTE:
+      this.vector4 = incoming.getSelectionVector4();
+      break;
+    case TWO_BYTE:
+      this.vector2 = incoming.getSelectionVector2();
+      break;
+    }
+    this.transfers = ImmutableList.copyOf(transfers);
+    setupEvaluators(context, incoming);
+  }
+
+  protected abstract void setupEvaluators(FragmentContext context, RecordBatch incoming) throws SchemaChangeException;
+  protected abstract void doPerRecordWork(int inIndex, int outIndex);
+
+  
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/TransferPairing.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/TransferPairing.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/TransferPairing.java
new file mode 100644
index 0000000..2b4ac81
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/TransferPairing.java
@@ -0,0 +1,35 @@
+package org.apache.drill.exec.physical.impl.project;
+
+import org.apache.drill.exec.record.vector.ValueVector;
+
+public class TransferPairing<T extends ValueVector<T>> {
+  
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TransferPairing.class);
+  
+  final T from;
+  final T to;
+  
+  protected TransferPairing(T from, T to) {
+    super();
+    this.from = from;
+    this.to = to;
+  }
+
+  public void transfer(){
+    from.transferTo(to);
+  }
+  
+  public static <T extends ValueVector<T>> TransferPairing<T> getTransferPairing(T from, T to){
+    return new TransferPairing<T>(from, to);
+  }
+
+  public T getFrom() {
+    return from;
+  }
+
+  public T getTo() {
+    return to;
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
index 3c2df61..1148c93 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.coord.DrillbitEndpointSerDe;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.FragmentLeaf;
@@ -28,7 +29,6 @@ import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalOperatorUtil;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.SchemaDefProtos.MajorType;
 import org.apache.drill.exec.record.MajorTypeSerDe;
 
 import com.fasterxml.jackson.core.JsonProcessingException;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index b26e742..bb07e56 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,13 +23,17 @@ import java.util.List;
 
 public class BatchSchema implements Iterable<MaterializedField> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchSchema.class);
-
+  final SelectionVectorMode selectionVector;
+  ;
   private final List<MaterializedField> fields;
-  final boolean hasSelectionVector;
 
-  BatchSchema(boolean hasSelectionVector, List<MaterializedField> fields) {
+  BatchSchema(SelectionVectorMode selectionVector, List<MaterializedField> fields) {
     this.fields = fields;
-    this.hasSelectionVector = hasSelectionVector;
+    this.selectionVector = selectionVector;
+  }
+
+  public static SchemaBuilder newBuilder() {
+    return new SchemaBuilder();
   }
 
   @Override
@@ -37,16 +41,24 @@ public class BatchSchema implements Iterable<MaterializedField> {
     return fields.iterator();
   }
 
-  public static SchemaBuilder newBuilder() {
-    return new SchemaBuilder();
+  public SelectionVectorMode getSelectionVector() {
+    return selectionVector;
   }
 
   @Override
   public String toString() {
-    return "BatchSchema [fields=" + fields + ", hasSelectionVector=" + hasSelectionVector + "]";
+    return "BatchSchema [fields=" + fields + ", selectionVector=" + selectionVector + "]";
   }
 
-  
-  
-  
+  public static enum SelectionVectorMode {
+    NONE(-1, false), TWO_BYTE(2, true), FOUR_BYTE(4, true);
+
+    public boolean hasSelectionVector;
+    public final int size;
+    SelectionVectorMode(int size, boolean hasSelectionVector) {
+      this.size = size;
+    }
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpressionTreeMaterializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpressionTreeMaterializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpressionTreeMaterializer.java
deleted file mode 100644
index 391aec5..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpressionTreeMaterializer.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-
-package org.apache.drill.exec.record;
-
-import com.google.common.collect.Lists;
-import org.apache.drill.common.expression.*;
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.common.expression.visitors.ExprVisitor;
-
-import java.util.List;
-
-public class ExpressionTreeMaterializer {
-    public LogicalExpression Materialize(LogicalExpression expr, BatchSchema schema, ErrorCollector errorCollector) {
-        return expr.accept(new MaterializeVisitor(schema, errorCollector));
-    }
-
-    private class MaterializeVisitor implements ExprVisitor<LogicalExpression> {
-        private final ErrorCollector errorCollector;
-        private final BatchSchema schema;
-        private boolean isModified; // Flag to track if children is changed
-
-        public MaterializeVisitor(BatchSchema schema, ErrorCollector errorCollector) {
-            this.schema = schema;
-            this.errorCollector = errorCollector;
-            isModified = false;
-        }
-
-        private LogicalExpression validateNewExpr(LogicalExpression newExpr) {
-            StringBuilder stringBuilder = new StringBuilder();
-            newExpr.addToString(stringBuilder);
-            newExpr.resolveAndValidate(stringBuilder.toString(), errorCollector);
-            return newExpr;
-        }
-
-        @Override
-        public LogicalExpression visitFunctionCall(FunctionCall call) {
-            List<LogicalExpression> args = Lists.newArrayList(call.iterator());
-            boolean hasChanged = false;
-            for (int i = 0; i < args.size(); ++i) {
-                LogicalExpression newExpr = args.get(i).accept(this);
-                if (isModified) {
-                    hasChanged = true;
-                    args.set(i, newExpr);
-                    isModified = false;
-                }
-            }
-
-            if(hasChanged) {
-                isModified = true;
-                return validateNewExpr(new FunctionCall(call.getDefinition(), args));
-            }
-
-            return call;
-        }
-
-        @Override
-        public LogicalExpression visitIfExpression(IfExpression ifExpr) {
-            List<IfExpression.IfCondition> conditions = Lists.newArrayList(ifExpr.iterator());
-            boolean hasChanged = false;
-            LogicalExpression newElseExpr = null;
-            if(ifExpr.elseExpression != null) {
-                newElseExpr = ifExpr.elseExpression.accept(this);
-                hasChanged = isModified;
-            }
-
-            isModified = false;
-
-            for(int i = 0; i < conditions.size(); ++i) {
-                IfExpression.IfCondition condition = conditions.get(i);
-
-                LogicalExpression newCondition = condition.condition.accept(this);
-                boolean modified = isModified;
-                isModified = false;
-                LogicalExpression newExpr = condition.expression.accept(this);
-                if(modified || isModified) {
-                    conditions.set(i, new IfExpression.IfCondition(newCondition, newExpr));
-                    hasChanged = true;
-                    isModified = false;
-                }
-            }
-
-            if(hasChanged) {
-                isModified = true;
-                return validateNewExpr(IfExpression.newBuilder().setElse(newElseExpr).addConditions(conditions).build());
-            }
-
-            return ifExpr;
-        }
-
-        @Override
-        public LogicalExpression visitSchemaPath(SchemaPath path) {
-            for (MaterializedField field : schema) {
-                if (field.getType() != DataType.LATEBIND && field.matches(path)) {
-                    isModified = true;
-                    return validateNewExpr(new FieldReference(path.getPath().toString(), field.getType()));
-                }
-            }
-
-            return path;
-        }
-
-        @Override
-        public LogicalExpression visitLongExpression(ValueExpressions.LongExpression intExpr) {
-            return intExpr;
-        }
-
-        @Override
-        public LogicalExpression visitDoubleExpression(ValueExpressions.DoubleExpression dExpr) {
-            return dExpr;
-        }
-
-        @Override
-        public LogicalExpression visitBoolean(ValueExpressions.BooleanExpression e) {
-            return e;
-        }
-
-        @Override
-        public LogicalExpression visitQuotedString(ValueExpressions.QuotedString e) {
-            return e;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java
index 718396e..8799546 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java
@@ -19,9 +19,9 @@ package org.apache.drill.exec.record;
 
 import java.io.IOException;
 
-import org.apache.drill.exec.proto.SchemaDefProtos.DataMode;
-import org.apache.drill.exec.proto.SchemaDefProtos.MajorType;
-import org.apache.drill.exec.proto.SchemaDefProtos.MinorType;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index b692a93..6cf7087 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -18,18 +18,17 @@
 package org.apache.drill.exec.record;
 
 import java.util.Iterator;
-import java.util.List;
 
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.RecordField.ValueMode;
-import org.apache.drill.exec.proto.SchemaDefProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
-import org.apache.drill.exec.proto.SchemaDefProtos.MajorType;
 import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
 import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
+import org.apache.drill.exec.record.vector.TypeHelper;
 
-public class MaterializedField implements Comparable<MaterializedField> {
+public class MaterializedField{
   private final FieldDef def;
 
   public MaterializedField(FieldDef def) {
@@ -40,12 +39,10 @@ public class MaterializedField implements Comparable<MaterializedField> {
     return new MaterializedField(def);
   }
   
-  public static MaterializedField create(SchemaPath path, int fieldId, int parentId, MajorType type) {
+  public static MaterializedField create(SchemaPath path, MajorType type) {
     FieldDef.Builder b = FieldDef.newBuilder();
-    b.setFieldId(fieldId);
     b.setMajorType(type);
     addSchemaPathToFieldDef(path, b);
-    b.setParentId(parentId);
     return create(b.build());
   }
 
@@ -90,10 +87,6 @@ public class MaterializedField implements Comparable<MaterializedField> {
     return def.getMajorType().getWidth();
   }
 
-  public int getFieldId() {
-    return def.getFieldId();
-  }
-
   public MajorType getType() {
     return def.getMajorType();
   }
@@ -120,6 +113,9 @@ public class MaterializedField implements Comparable<MaterializedField> {
       throw new UnsupportedOperationException();
     }
     return new MaterializedField(def.toBuilder().setMajorType(mt.toBuilder().setMode(newDataMode).build()).build());
+
+  public Class<?> getValueClass() {
+    return TypeHelper.getValueVectorClass(getType().getMinorType(), getDataMode());
   }
 
   public boolean matches(SchemaPath path) {
@@ -141,46 +137,8 @@ public class MaterializedField implements Comparable<MaterializedField> {
     }
     // we've reviewed all path segments. confirm that we don't have any extra name parts.
     return !iter.hasNext();
-
-  private void check(String name, Object val1, Object expected) throws SchemaChangeException{
-    if(expected.equals(val1)) return;
-    throw new SchemaChangeException("Expected and actual field definitions don't match. Actual %s: %s, expected %s: %s", name, val1, name, expected);
   }
   
-  public void checkMaterialization(MaterializedField expected) throws SchemaChangeException{
-    if(this.type == expected.type || expected.type == DataType.LATEBIND) throw new SchemaChangeException("Expected and actual field definitions don't match. Actual DataType: %s, expected DataTypes: %s", this.type, expected.type);
-    if(expected.valueClass != null) check("valueClass", this.valueClass, expected.valueClass);
-    check("fieldId", this.fieldId, expected.fieldId);
-    check("nullability", this.nullable, expected.nullable);
-    check("valueMode", this.mode, expected.mode);
-  }
-
-  // private void check(String name, Object val1, Object expected) throws SchemaChangeException{
-  // if(expected.equals(val1)) return;
-  // throw new
-  // SchemaChangeException("Expected and actual field definitions don't match. Actual %s: %s, expected %s: %s", name,
-  // val1, name, expected);
-  // }
-
-  // public void checkMaterialization(MaterializedField expected) throws SchemaChangeException{
-  // if(this.type == expected.type || expected.type == DataType.LATEBIND) throw new
-  // SchemaChangeException("Expected and actual field definitions don't match. Actual DataType: %s, expected DataTypes: %s",
-  // this.type, expected.type);
-  // if(expected.valueClass != null) check("valueClass", this.valueClass, expected.valueClass);
-  // check("fieldId", this.fieldId, expected.fieldId);
-  // check("nullability", this.nullable, expected.nullable);
-  // check("valueMode", this.mode, expected.mode);
-  // }
-  //
-  // public MaterializedField getNullableVersion(Class<?> valueClass){
-  // return new MaterializedField(path, fieldId, type, true, mode, valueClass);
-  // }
-
-  @Override
-  public int compareTo(MaterializedField o) {
-    return Integer.compare(this.getFieldId(), o.getFieldId());
-  }
-
   @Override
   public String toString() {
     return "MaterializedField [" + def.toString() + "]";

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/NullExpression.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/NullExpression.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/NullExpression.java
new file mode 100644
index 0000000..4ab908f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/NullExpression.java
@@ -0,0 +1,32 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.visitors.ExprVisitor;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+
+public class NullExpression implements LogicalExpression{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NullExpression.class);
+
+  public static final NullExpression INSTANCE = new NullExpression();
+  
+  final MajorType t = MajorType.newBuilder().setMode(DataMode.OPTIONAL).setMinorType(MinorType.NULL).build();
+  
+  @Override
+  public MajorType getMajorType() {
+    return t;
+  }
+
+  @Override
+  public <T, V, E extends Exception> T accept(ExprVisitor<T, V, E> visitor, V value) throws E {
+    return visitor.visitUnknown(this, value);
+  }
+
+  @Override
+  public ExpressionPosition getPosition() {
+    return ExpressionPosition.UNKNOWN;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index c6b7888..40447ec 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -17,8 +17,12 @@
  ******************************************************************************/
 package org.apache.drill.exec.record;
 
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.record.vector.SelectionVector2;
+import org.apache.drill.exec.record.vector.SelectionVector4;
+import org.apache.drill.exec.record.vector.ValueVector;
 
 /**
  * A record batch contains a set of field values for a particular range of records. In the case of a record batch
@@ -39,6 +43,12 @@ public interface RecordBatch {
     NOT_YET // used by batches that haven't received incoming data yet.
   }
 
+  public static enum SetupOutcome {
+    OK,
+    OK_NEW_SCHEMA,
+    FAILED
+  }
+  
   /**
    * Access the FragmentContext of the current query fragment. Useful for reporting failure information or other query
    * level information.
@@ -67,12 +77,18 @@ public interface RecordBatch {
    */
   public void kill();
 
-
-  public abstract <T extends ValueVector> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor;
-
-//  public abstract void getDictReader(int fieldId, Class<?> clazz) throws InvalidValueAccessor;
-//
-//  public abstract void getRleReader(int fieldId, Class<?> clazz) throws InvalidValueAccessor;
+  public abstract SelectionVector2 getSelectionVector2();
+  public abstract SelectionVector4 getSelectionVector4();
+  
+  /**
+   * Get the value vector 
+   * @param path The path where the vector should be located.
+   * @return The local field id associated with this vector.
+   */
+  public abstract TypedFieldId getValueVector(SchemaPath path);
+  
+  
+  public abstract <T extends ValueVector<T>> T getValueVectorById(int fieldId, Class<?> vvClass);
 
   /**
    * Update the data in each Field reading interface for the next range of records. Once a RecordBatch returns an
@@ -89,4 +105,41 @@ public interface RecordBatch {
    */
   public WritableBatch getWritableBatch();
 
+  public static class TypedFieldId{
+    final MajorType type;
+    final int fieldId;
+    public TypedFieldId(MajorType type, int fieldId) {
+      super();
+      this.type = type;
+      this.fieldId = fieldId;
+    }
+    public MajorType getType() {
+      return type;
+    }
+    public int getFieldId() {
+      return fieldId;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      TypedFieldId other = (TypedFieldId) obj;
+      if (fieldId != other.fieldId)
+        return false;
+      if (type == null) {
+        if (other.type != null)
+          return false;
+      } else if (!type.equals(other.type))
+        return false;
+      return true;
+    }
+    
+    
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index a2dbd81..c3db9f0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -21,22 +21,28 @@ import io.netty.buffer.ByteBuf;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
 import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
-import org.apache.drill.exec.vector.TypeHelper;
-import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
+import org.apache.drill.exec.record.vector.TypeHelper;
+import org.apache.drill.exec.record.vector.ValueVector;
 
+import com.beust.jcommander.internal.Lists;
+import com.beust.jcommander.internal.Maps;
 import com.carrotsearch.hppc.IntObjectOpenHashMap;
 import com.carrotsearch.hppc.cursors.IntObjectCursor;
+import com.google.common.collect.ImmutableList;
 
-public class RecordBatchLoader implements Iterable<IntObjectCursor<ValueVector>>{
+public class RecordBatchLoader implements Iterable<ValueVector<?>>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchLoader.class);
 
-  private IntObjectOpenHashMap<ValueVector> vectors = new IntObjectOpenHashMap<ValueVector>();
+  private List<ValueVector<?>> vectors = Lists.newArrayList();
   private final BufferAllocator allocator;
   private int recordCount; 
   private BatchSchema schema;
@@ -60,64 +66,72 @@ public class RecordBatchLoader implements Iterable<IntObjectCursor<ValueVector>>
 //    logger.debug("Loading record batch with def {} and data {}", def, buf);
     this.recordCount = def.getRecordCount();
     boolean schemaChanged = false;
+
+    Map<MaterializedField, ValueVector<?>> oldFields = Maps.newHashMap();
+    for(ValueVector<?> v : this.vectors){
+      oldFields.put(v.getField(), v);
+    }
     
-    IntObjectOpenHashMap<ValueVector> newVectors = new IntObjectOpenHashMap<ValueVector>();
+    List<ValueVector<?>> newVectors = Lists.newArrayList();
 
     List<FieldMetadata> fields = def.getFieldList();
     
     int bufOffset = 0;
     for (FieldMetadata fmd : fields) {
       FieldDef fieldDef = fmd.getDef();
-      ValueVector v = vectors.remove(fieldDef.getFieldId());
-      if (v != null) {
-        if (v.getField().getDef().equals(fieldDef)) {
-          ValueVector.Mutator m = v.getMutator();
-          v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
-          newVectors.put(fieldDef.getFieldId(), v);
-          continue;
-        } else {
-          v.close();
-          v = null;
-        }
+      ValueVector<?> v = oldFields.remove(fieldDef);
+      if(v != null){
+        newVectors.add(v);
+        continue;
       }
-      // if we arrive here, either the metadata didn't match, or we didn't find a vector.
+      
+      // if we arrive here, we didn't have a matching vector.
       schemaChanged = true;
       MaterializedField m = new MaterializedField(fieldDef);
       v = TypeHelper.getNewVector(m, allocator);
       v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
       newVectors.put(fieldDef.getFieldId(), v);
+      v.setTo(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
+      newVectors.add(v);
     }
     
-    if(!vectors.isEmpty()){
+    if(!oldFields.isEmpty()){
       schemaChanged = true;
-      for(IntObjectCursor<ValueVector> cursor : newVectors){
-        cursor.value.close();
+      for(ValueVector<?> v : oldFields.values()){
+        v.close();
       }
-      
     }
     
-    if(schemaChanged){
-      // rebuild the schema.
-      SchemaBuilder b = BatchSchema.newBuilder();
-      for(IntObjectCursor<ValueVector> cursor : newVectors){
-        b.addField(cursor.value.getField());
-      }
-      b.setSelectionVector(false);
-      this.schema = b.build();
+    // rebuild the schema.
+    SchemaBuilder b = BatchSchema.newBuilder();
+    for(ValueVector<?> v : newVectors){
+      b.addField(v.getField());
     }
-    vectors = newVectors;
+    b.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
+    this.schema = b.build();
+    vectors = ImmutableList.copyOf(newVectors);
     return schemaChanged;
 
   }
 
+  public TypedFieldId getValueVector(SchemaPath path) {
+    for(int i =0; i < vectors.size(); i++){
+      ValueVector<?> vv = vectors.get(i);
+      if(vv.getField().matches(path)) return new TypedFieldId(vv.getField().getType(), i); 
+    }
+    return null;
+  }
+  
   @SuppressWarnings("unchecked")
-  public <T extends ValueVector> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
-    ValueVector v = vectors.get(fieldId);
+  public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<?> clazz) {
+    ValueVector<?> v = vectors.get(fieldId);
     assert v != null;
-    if (v.getClass() != clazz)
-      throw new InvalidValueAccessor(String.format(
+    if (v.getClass() != clazz){
+      logger.warn(String.format(
           "Failure while reading vector.  Expected vector class of %s but was holding vector class %s.",
           clazz.getCanonicalName(), v.getClass().getCanonicalName()));
+      return null;
+    }
     return (T) v;
   }
 
@@ -131,7 +145,7 @@ public class RecordBatchLoader implements Iterable<IntObjectCursor<ValueVector>>
   }
 
   @Override
-  public Iterator<IntObjectCursor<ValueVector>> iterator() {
+  public Iterator<ValueVector<?>> iterator() {
     return this.vectors.iterator();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
index 1e25b1a..20cc82c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
@@ -17,15 +17,12 @@
  ******************************************************************************/
 package org.apache.drill.exec.record;
 
-import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
-import org.apache.drill.common.expression.types.DataType;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.physical.RecordField.ValueMode;
 
-import com.carrotsearch.hppc.IntObjectOpenHashMap;
-import com.carrotsearch.hppc.cursors.ObjectCursor;
+import com.beust.jcommander.internal.Sets;
 import com.google.common.collect.Lists;
 
 /**
@@ -33,40 +30,39 @@ import com.google.common.collect.Lists;
  * builder will always check that this schema is a equal or more materialized version of the current schema.
  */
 public class SchemaBuilder {
-  private IntObjectOpenHashMap<MaterializedField> fields = new IntObjectOpenHashMap<MaterializedField>();
-  private IntObjectOpenHashMap<MaterializedField> expectedFields = new IntObjectOpenHashMap<MaterializedField>();
+  private Set<MaterializedField> fields = Sets.newHashSet();
 
-  private boolean hasSelectionVector;
-
-  public SchemaBuilder(BatchSchema expected) {
-    for (MaterializedField f : expected) {
-      expectedFields.put(f.getFieldId(), f);
-    }
-    hasSelectionVector = expected.hasSelectionVector;
-  }
+  private BatchSchema.SelectionVectorMode selectionVectorMode;
 
   SchemaBuilder() {
   }
 
-  /**
-   * Add a field where we don't have type information. In this case, DataType will be set to LATEBIND and valueClass
-   * will be set to null.
-   * 
-   * @param fieldId
-   *          The desired fieldId. Should be unique for this BatchSchema.
-   * @param nullable
-   *          Whether this field supports nullability.
-   * @param mode
-   * @throws SchemaChangeException
-   */
+//  /**
+//   * Add a field where we don't have type information. In this case, DataType will be set to LATEBIND and valueClass
+//   * will be set to null.
+//   *
+//   * @param fieldId
+//   *          The desired fieldId. Should be unique for this BatchSchema.
+//   * @param nullable
+//   *          Whether this field supports nullability.
+//   * @param mode
+//   * @throws SchemaChangeException
+//   */
 //  public void addLateBindField(short fieldId, boolean nullable, ValueMode mode) throws SchemaChangeException {
 //    addTypedField(fieldId, DataType.LATEBIND, nullable, mode, Void.class);
 //  }
 
-  public void setSelectionVector(boolean hasSelectionVector) {
-    this.hasSelectionVector = hasSelectionVector;
+  public SchemaBuilder setSelectionVectorMode(BatchSchema.SelectionVectorMode selectionVectorMode) {
+    this.selectionVectorMode = selectionVectorMode;
+    return this;
   }
 
+  public SchemaBuilder addFields(Iterable<MaterializedField> fields){
+    for(MaterializedField f : fields){
+      addField(f);
+    }
+    return this;
+  }
   
 //  private void setTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass)
 //      throws SchemaChangeException {
@@ -83,8 +79,9 @@ public class SchemaBuilder {
 //    fields.put(f.getFieldId(), f);
 //  }
   
-  public void addField(MaterializedField f){
-    fields.put(f.getFieldId(), f);
+  public SchemaBuilder addField(MaterializedField f){
+    fields.add(f);
+    return this;
   }
 
 //  public void addTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass)
@@ -104,9 +101,9 @@ public class SchemaBuilder {
 //    setTypedField(fieldId, type, nullable, mode, valueClass);
 //  }
   
-  public void removeField(short fieldId) throws SchemaChangeException{
-    MaterializedField f = fields.remove(fieldId);
-    if(f == null) throw new SchemaChangeException("You attempted to remove an nonexistent field.");
+  public SchemaBuilder removeField(MaterializedField f) throws SchemaChangeException{
+    if(!fields.remove(f)) throw new SchemaChangeException("You attempted to remove an nonexistent field.");
+    return this;
   }
 
   /**
@@ -114,14 +111,8 @@ public class SchemaBuilder {
    * @return
    * @throws SchemaChangeException
    */
-  public BatchSchema build() throws SchemaChangeException {
-    // check if any fields are unaccounted for.
-
-    List<MaterializedField> fieldList = Lists.newArrayList();
-    for (ObjectCursor<MaterializedField> f : fields.values()) {
-      if (f != null) fieldList.add(f.value);
-    }
-    Collections.sort(fieldList);
-    return new BatchSchema(this.hasSelectionVector, fieldList);
+  public BatchSchema build(){
+    List<MaterializedField> fieldList = Lists.newArrayList(fields);
+    return new BatchSchema(this.selectionVectorMode, fieldList);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 4b97768..2e1754c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -57,50 +57,27 @@ public class WritableBatch {
   public ByteBuf[] getBuffers(){
     return buffers;
   }
-  
-//  public static WritableBatch get(ValueVector[] vectors){
-//    WritableCreator c = new WritableCreator();
-//    for(int i =0; i < vectors.length; i++){
-//      c.apply(i, vectors[i]);
-//    }
-//    return c.get();
-//  }
-//  
-  
-  public static WritableBatch get(int recordCount, IntObjectOpenHashMap<ValueVector> fields){
-    WritableCreator creator = new WritableCreator(recordCount);
-    fields.forEach(creator);
-    return creator.get();
-    
-  }
-  
-  private static class WritableCreator implements IntObjectProcedure<ValueVector>{
+
+  public static WritableBatch get(int recordCount, List<ValueVector<?>> vectors){
     
     List<ByteBuf> buffers = Lists.newArrayList();
     List<FieldMetadata> metadata = Lists.newArrayList();
-    private int recordCount;
     
 
-    public WritableCreator(int recordCount) {
-      super();
-      this.recordCount = recordCount;
-    }
-    
-    @Override
-    public void apply(int key, ValueVector value) {
-      metadata.add(value.getMetadata());
-      for(ByteBuf b : value.getBuffers()){
+    for(ValueVector<?> vv : vectors){
+      metadata.add(vv.getMetadata());
+      for(ByteBuf b : vv.getBuffers()){
         buffers.add(b);
         b.retain();
       }
-      value.clear();
+      // allocate new buffer to release hold on old buffer.
+      vv.allocateNew(vv.capacity());
     }
 
-    public WritableBatch get(){
-      RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount).build();
-      WritableBatch b = new WritableBatch(batchDef, buffers);
+    RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount).build();
+    WritableBatch b = new WritableBatch(batchDef, buffers);
       return b;
-    }
     
   }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableFixed8.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableFixed8.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableFixed8.java
new file mode 100644
index 0000000..3fc39eb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableFixed8.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public final class NullableFixed8 extends NullableValueVector<NullableFixed8, Fixed8>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NullableFixed8.class);
+
+  public NullableFixed8(MaterializedField field, BufferAllocator allocator) {
+    super(field, allocator);
+  }
+
+  @Override
+  protected Fixed8 getNewValueVector(BufferAllocator allocator) {
+    return new Fixed8(this.field, allocator);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java
index 02b75ce..3546bd8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java
@@ -37,5 +37,13 @@ public class SelectionVector {
   public void allocateNew(int count) {
 
   }
+<<<<<<< HEAD:sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java
+=======
+  
+  public final int getInt(int index){
+    index*=4;
+    return data.getInt(index);
+  }
+>>>>>>> Build working:sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableFixed4.java
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector2.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector2.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector2.java
new file mode 100644
index 0000000..c314cd4
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector2.java
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.DeadBuf;
+
+/**
+ * A selection vector that fronts, at most, a
+ */
+public class SelectionVector2{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector2.class);
+
+  private final BufferAllocator allocator;
+  private ByteBuf buffer = DeadBuf.DEAD_BUFFER;
+
+  public SelectionVector2(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  public int getCount(){
+    return -1;
+  }
+
+  public int getIndex(int directIndex){
+    return buffer.getChar(directIndex);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector4.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector4.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector4.java
new file mode 100644
index 0000000..d857146
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector4.java
@@ -0,0 +1,41 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.record.vector;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.DeadBuf;
+
+public class SelectionVector4 {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector4.class);
+
+  private final BufferAllocator allocator;
+  private ByteBuf buffer = DeadBuf.DEAD_BUFFER;
+
+  public SelectionVector4(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  public int getCount(){
+    return -1;
+  }
+
+
+}