You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:58:12 UTC

[51/53] [abbrv] git commit: Initial working filter operator

Initial working filter operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/65e2cfce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/65e2cfce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/65e2cfce

Branch: refs/heads/master
Commit: 65e2cfce5364ccfe30b83b9dddeb304b79efbc76
Parents: 57de7ed
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed Jul 17 00:27:31 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Fri Jul 19 14:53:31 2013 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/cache/HazelCache.java |   1 -
 .../drill/exec/compile/ClassTransformer.java    |  17 +-
 .../drill/exec/compile/JDKClassCompiler.java    |   5 +-
 .../exec/compile/TemplateClassDefinition.java   |  24 +--
 .../apache/drill/exec/expr/CodeGenerator.java   |  20 +-
 .../drill/exec/expr/EvaluationVisitor.java      |  22 +-
 .../drill/exec/expr/fn/FunctionConverter.java   |   2 +-
 .../exec/expr/fn/MethodGrabbingVisitor.java     |   3 +-
 .../apache/drill/exec/ops/FragmentContext.java  |   9 +-
 .../drill/exec/physical/impl/ImplCreator.java   |  10 +-
 .../drill/exec/physical/impl/ScanBatch.java     |  11 +-
 .../exec/physical/impl/WireRecordBatch.java     |   8 +-
 .../physical/impl/filter/ExampleFilter.java     | 111 ----------
 .../impl/filter/FilterBatchCreator.java         |  23 +++
 .../physical/impl/filter/FilterEvaluator.java   |  10 +
 .../physical/impl/filter/FilterRecordBatch.java | 200 +++++++++++++++++++
 .../physical/impl/filter/FilterTemplate.java    |  63 ++++--
 .../exec/physical/impl/filter/Filterer.java     |  19 ++
 .../impl/filter/ReturnValueExpression.java      |  39 ++++
 .../SelectionVectorPopulationExpression.java    |  39 ----
 .../physical/impl/project/ProjectEvaluator.java |   4 +-
 .../impl/project/ProjectRecordBatch.java        |  11 +-
 .../exec/physical/impl/project/Projector.java   |   2 +-
 .../impl/project/ProjectorTemplate.java         |  10 +-
 .../apache/drill/exec/record/RecordBatch.java   |   3 +-
 .../drill/exec/record/RecordBatchLoader.java    |   4 +-
 .../apache/drill/exec/record/SchemaBuilder.java |   2 +-
 .../exec/record/selection/SelectionVector2.java |  36 +++-
 .../drill/exec/store/JSONRecordReader.java      |   3 +-
 .../exec/compile/TestClassTransformation.java   |   2 +-
 .../apache/drill/exec/expr/ExpressionTest.java  |   2 +-
 .../exec/physical/impl/SimpleRootExec.java      |   4 +
 .../physical/impl/filter/TestSimpleFilter.java  |  58 ++++++
 .../record/ExpressionTreeMaterializerTest.java  |   2 +-
 .../drill/exec/store/JSONRecordReaderTest.java  |   3 +-
 .../src/test/resources/filter/test1.json        |  34 ++++
 36 files changed, 566 insertions(+), 250 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
index f4fdbfa..fe4a212 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
@@ -29,7 +29,6 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
 
-import com.beust.jcommander.internal.Lists;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.protobuf.InvalidProtocolBufferException;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
index 4bf6e7e..d7cde2a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
@@ -44,11 +44,11 @@ import org.objectweb.asm.tree.ClassNode;
 import org.objectweb.asm.tree.FieldNode;
 import org.objectweb.asm.tree.MethodNode;
 
-import com.beust.jcommander.internal.Sets;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import com.google.common.io.Resources;
 
@@ -112,15 +112,16 @@ public class ClassTransformer {
       String materializedClassName) throws ClassTransformationException {
 
     try {
-
+      long t0 = System.nanoTime();
       final byte[] implementationClass = classLoader.getClassByteCode(materializedClassName, entireClass);
 
       // Get Template Class
       final String templateClassName = templateDefinition.getTemplateClassName().replaceAll("\\.", File.separator);
       final String templateClassPath = File.separator + templateClassName + ".class";
+      long t1 = System.nanoTime();
       final byte[] templateClass = getClassByteCodeFromPath(templateClassPath);
-      int fileNum = new Random().nextInt(100);
-      Files.write(templateClass, new File(String.format("/tmp/%d-template.class", fileNum)));
+//      int fileNum = new Random().nextInt(100);
+      //Files.write(templateClass, new File(String.format("/tmp/%d-template.class", fileNum)));
       // Generate Merge Class
 
       // Setup adapters for merging, remapping class names and class writing. This is done in reverse order of how they
@@ -130,7 +131,11 @@ public class ClassTransformer {
       RemapClasses remapper = new RemapClasses(oldTemplateSlashName, materializedSlashName);
       
       {
+        
         ClassNode impl = getClassNodeFromByteCode(implementationClass);
+        long t2 = System.nanoTime();
+        logger.debug("Compile {}, decode template {}", (t1 - t0)/1000/1000, (t2- t1)/1000/1000);
+        
         ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES);
 
         ClassVisitor remappingAdapter = new RemappingClassAdapter(cw, remapper);
@@ -139,7 +144,7 @@ public class ClassTransformer {
         ClassReader tReader = new ClassReader(templateClass);
         tReader.accept(mergingAdapter, ClassReader.EXPAND_FRAMES);
         byte[] outputClass = cw.toByteArray();
-        Files.write(outputClass, new File(String.format("/tmp/%d-output.class", fileNum)));
+//        Files.write(outputClass, new File(String.format("/tmp/%d-output.class", fileNum)));
         outputClass = cw.toByteArray();
 
         // Load the class
@@ -160,7 +165,7 @@ public class ClassTransformer {
         reader.accept(remap, ClassReader.EXPAND_FRAMES);
         byte[] newByteCode = subcw.toByteArray();
         classLoader.injectByteCode(s.replace(oldTemplateSlashName, materializedSlashName).replace('/', '.'), newByteCode);
-        Files.write(subcw.toByteArray(), new File(String.format("/tmp/%d-sub-%d.class", fileNum, i)));
+//        Files.write(subcw.toByteArray(), new File(String.format("/tmp/%d-sub-%d.class", fileNum, i)));
         i++;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JDKClassCompiler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JDKClassCompiler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JDKClassCompiler.java
index 15e87fe..8f6e572 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JDKClassCompiler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JDKClassCompiler.java
@@ -30,9 +30,9 @@ import java.util.List;
 import javax.tools.Diagnostic;
 import javax.tools.DiagnosticListener;
 import javax.tools.JavaCompiler;
+import javax.tools.JavaCompiler.CompilationTask;
 import javax.tools.JavaFileManager;
 import javax.tools.JavaFileObject;
-import javax.tools.JavaCompiler.CompilationTask;
 import javax.tools.JavaFileObject.Kind;
 import javax.tools.SimpleJavaFileObject;
 import javax.tools.StandardLocation;
@@ -43,7 +43,8 @@ import org.codehaus.commons.compiler.Location;
 import org.codehaus.commons.compiler.jdk.ByteArrayJavaFileManager;
 import org.codehaus.commons.compiler.jdk.ByteArrayJavaFileManager.ByteArrayJavaFileObject;
 
-import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.Lists;
+
 
 class JDKClassCompiler implements ClassCompiler {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JDKClassCompiler.class);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
index 5a01dce..20ef361 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
@@ -17,7 +17,6 @@
  ******************************************************************************/
 package org.apache.drill.exec.compile;
 
-import java.lang.reflect.Method;
 
 
 public class TemplateClassDefinition<T>{
@@ -25,24 +24,19 @@ public class TemplateClassDefinition<T>{
   private final Class<T> externalInterface;
   private final String templateClassName;
   private final Class<?> internalInterface;
-  private final String setupName;
-  private final String evalName;
-  
+  private final Class<?> evalReturnType;
 
-  public TemplateClassDefinition(Class<T> externalInterface, String templateClassName, Class<?> internalInterface, String setupName, String evalName) {
+  public TemplateClassDefinition(Class<T> externalInterface, String templateClassName, Class<?> internalInterface, Class<?> evalReturnType) {
     super();
     this.externalInterface = externalInterface;
     this.templateClassName = templateClassName;
     this.internalInterface = internalInterface;
-    this.setupName = setupName;
-    this.evalName = evalName;
+    this.evalReturnType = evalReturnType;
   }
 
-
   public Class<T> getExternalInterface() {
     return externalInterface;
   }
-
   
   public Class<?> getInternalInterface() {
     return internalInterface;
@@ -52,16 +46,8 @@ public class TemplateClassDefinition<T>{
     return templateClassName;
   }
 
-  public String getSetupName() {
-    return setupName;
+  public Class<?> getEvalReturnType() {
+    return evalReturnType;
   }
-
-
-  public String getEvalName() {
-    return evalName;
-  }
-  
-  
-  
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
index ed6bd9b..241c1cc 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
@@ -1,10 +1,13 @@
 package org.apache.drill.exec.expr;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 
+import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.expr.holders.BooleanHolder;
 import org.apache.drill.exec.expr.holders.IntHolder;
@@ -36,8 +39,6 @@ public class CodeGenerator<T> {
   private JBlock currentEvalBlock;
   private JBlock currentSetupBlock;
   private final EvaluationVisitor evaluationVisitor;
-  private final String setupName;
-  private final String perRecordName;
   private final TemplateClassDefinition<T> definition;
   private JCodeModel model;
   private int index = 0;
@@ -46,8 +47,6 @@ public class CodeGenerator<T> {
     super();
     try{
       this.definition = definition;
-      this.setupName = definition.getSetupName();
-      this.perRecordName = definition.getEvalName();
       this.model = new JCodeModel();
       this.clazz = model._package("org.apache.drill.exec.test.generated")._class("Test1");
       clazz._implements(definition.getInternalInterface());
@@ -59,7 +58,7 @@ public class CodeGenerator<T> {
     }
   }
 
-  public void addNextWrite(ValueVectorWriteExpression ex){
+  public void addExpr(LogicalExpression ex){
     logger.debug("Adding next write {}", ex);
     currentEvalBlock = new JBlock();
     parentEvalBlock.add(currentEvalBlock);
@@ -80,20 +79,27 @@ public class CodeGenerator<T> {
     return currentSetupBlock;
   }
   
+  
+  public TemplateClassDefinition<T> getDefinition() {
+    return definition;
+  }
+
   public String generate() throws IOException{
 
     {
       //setup method
-      JMethod m = clazz.method(JMod.PUBLIC, model.VOID, this.setupName);
+      JMethod m = clazz.method(JMod.PUBLIC, model.VOID, "doSetup");
       m.param(model._ref(FragmentContext.class), "context");
       m.param(model._ref(RecordBatch.class), "incoming");
       m.param(model._ref(RecordBatch.class), "outgoing");
+      m._throws(SchemaChangeException.class);
       m.body().add(parentSetupBlock);
     }
     
     {
       // eval method.
-      JMethod m = clazz.method(JMod.PUBLIC, model.VOID, this.perRecordName);
+      JType ret = definition.getEvalReturnType() == null ? model.VOID : model._ref(definition.getEvalReturnType());
+      JMethod m = clazz.method(JMod.PUBLIC, ret, "doEval");  
       m.param(model.INT, "inIndex");
       m.param(model.INT, "outIndex");
       m.body().add(parentEvalBlock);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index 6b0e499..c219d9c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -14,7 +14,7 @@ import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.CodeGenerator.HoldingContainer;
 import org.apache.drill.exec.expr.fn.FunctionHolder;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
-import org.apache.drill.exec.physical.impl.filter.SelectionVectorPopulationExpression;
+import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.vector.TypeHelper;
 
@@ -117,7 +117,7 @@ public class EvaluationVisitor extends AbstractExprVisitor<HoldingContainer, Cod
   @Override
   public HoldingContainer visitBooleanConstant(BooleanExpression e, CodeGenerator<?> generator) throws RuntimeException {
     HoldingContainer out = generator.declare(e.getMajorType());
-    generator.getBlock().assign(out.getValue(), JExpr.lit(e.getBoolean()));
+    generator.getBlock().assign(out.getValue(), JExpr.lit(e.getBoolean() ? 1 : 0));
     return out;
   }
   
@@ -127,8 +127,8 @@ public class EvaluationVisitor extends AbstractExprVisitor<HoldingContainer, Cod
       return visitValueVectorExpression((ValueVectorReadExpression) e, generator);
     }else if(e instanceof ValueVectorWriteExpression){
       return visitValueVectorWriteExpression((ValueVectorWriteExpression) e, generator);
-    }else if(e instanceof SelectionVectorPopulationExpression){
-      return visitSelectionVectorExpression((SelectionVectorPopulationExpression) e, generator);
+    }else if(e instanceof ReturnValueExpression){
+      return visitReturnValueExpression((ReturnValueExpression) e, generator);
     }else{
       return super.visitUnknown(e, generator);  
     }
@@ -196,21 +196,11 @@ public class EvaluationVisitor extends AbstractExprVisitor<HoldingContainer, Cod
   }
   
   
-  private HoldingContainer visitSelectionVectorExpression(SelectionVectorPopulationExpression e, CodeGenerator<?> generator){
-    JType svClass = generator.getModel()._ref(SelectionVector2.class);
-    JVar sv = generator.declareClassField("sv", svClass);
-    JVar index = generator.declareClassField("svIndex", generator.getModel().CHAR);
+  private HoldingContainer visitReturnValueExpression(ReturnValueExpression e, CodeGenerator<?> generator){
     LogicalExpression child = e.getChild();
     Preconditions.checkArgument(child.getMajorType().equals(Types.REQUIRED_BOOLEAN));
     HoldingContainer hc = child.accept(this, generator);
-    generator.getBlock()._return(hc.getValue());
-    
-//    JBlock blk = generator.getSetupBlock();
-//    blk.assign(sv, JExpr.direct("outgoing").invoke("getSelectionVector2"));
-//    JConditional jc = blk._if(hc.getValue());
-//    JBlock body = jc._then();
-//    body.add(sv.invoke("set").arg(index).arg(JExpr.direct("inIndex")));
-//    body.assign(index, index.plus(JExpr.lit(1)));
+    generator.getBlock()._return(hc.getValue().eq(JExpr.lit(1)));
     return null;
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
index 84f04f0..8e0f1be 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
@@ -25,7 +25,7 @@ import org.codehaus.janino.Parser;
 import org.codehaus.janino.Scanner;
 import org.mortbay.util.IO;
 
-import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.Lists;
 import com.google.common.io.InputSupplier;
 import com.google.common.io.Resources;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/MethodGrabbingVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/MethodGrabbingVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/MethodGrabbingVisitor.java
index 22b9046..57268ee 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/MethodGrabbingVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/MethodGrabbingVisitor.java
@@ -8,7 +8,8 @@ import org.codehaus.janino.Java.ClassDeclaration;
 import org.codehaus.janino.Java.MethodDeclarator;
 import org.codehaus.janino.util.Traverser;
 
-import com.beust.jcommander.internal.Maps;
+import com.google.common.collect.Maps;
+
 
 public class MethodGrabbingVisitor{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MethodGrabbingVisitor.class);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 876b873..2ae4afa 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -103,11 +103,14 @@ public class FragmentContext {
     return context.getAllocator();
   }
 
-  public <T> T getImplementationClass(TemplateClassDefinition<T> templateDefinition, CodeGenerator<T> cg) throws ClassTransformationException, IOException{
-    return transformer.getImplementationClass(this.loader, templateDefinition, cg.generate(), cg.getMaterializedClassName());
+  public <T> T getImplementationClass(CodeGenerator<T> cg) throws ClassTransformationException, IOException{
+    long t1 = System.nanoTime();
+    T t= transformer.getImplementationClass(this.loader, cg.getDefinition(), cg.generate(), cg.getMaterializedClassName());
+    System.out.println( (System.nanoTime() - t1)/1000/1000 );
+    return t;
+    
   }
   
-
   public void addMetricsToStatus(FragmentStatus.Builder stats){
     stats.setBatchesCompleted(batchesCompleted.get());
     stats.setDataProcessed(dataProcessed.get());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 739c0d4..f96d6f3 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -26,12 +26,14 @@ import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Scan;
+import org.apache.drill.exec.physical.config.Filter;
 import org.apache.drill.exec.physical.config.MockScanBatchCreator;
 import org.apache.drill.exec.physical.config.MockScanPOP;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.config.RandomReceiver;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SingleSender;
+import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
 import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
 
@@ -46,6 +48,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
   private RandomReceiverCreator rrc = new RandomReceiverCreator();
   private SingleSenderCreator ssc = new SingleSenderCreator();
   private ProjectBatchCreator pbc = new ProjectBatchCreator();
+  private FilterBatchCreator fbc = new FilterBatchCreator();
   private RootExec root = null;
   
   private ImplCreator(){}
@@ -78,10 +81,13 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
     root = sc.getRoot(context, op, getChildren(op, context));
     return null;
   }
-
-  
   
   @Override
+  public RecordBatch visitFilter(Filter filter, FragmentContext context) throws ExecutionSetupException {
+    return fbc.getBatch(context, filter, getChildren(filter, context));
+  }
+
+  @Override
   public RecordBatch visitSingleSender(SingleSender op, FragmentContext context) throws ExecutionSetupException {
     root = ssc.getRoot(context, op, getChildren(op, context));
     return null;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 5688bb1..084db54 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -26,7 +26,6 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.InvalidValueAccessor;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SchemaBuilder;
@@ -36,9 +35,8 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.ValueVector;
 
-import com.beust.jcommander.internal.Lists;
-import com.beust.jcommander.internal.Maps;
-import com.carrotsearch.hppc.procedures.IntObjectProcedure;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 /**
  * Record batch used for a particular scan. Operators against one or more
@@ -171,6 +169,11 @@ public class ScanBatch implements RecordBatch {
   }
 
   @Override
+  public Iterator<ValueVector> iterator() {
+    return vectors.iterator();
+  }
+
+  @Override
   public WritableBatch getWritableBatch() {
     return WritableBatch.get(this.getRecordCount(), vectors);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index d2b8bfd..a575f69 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -17,6 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.impl;
 
+import java.util.Iterator;
+
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -66,9 +68,13 @@ public class WireRecordBatch implements RecordBatch{
   public void kill() {
     fragProvider.kill(context);
   }
-
   
   @Override
+  public Iterator<ValueVector> iterator() {
+    return batchLoader.iterator();
+  }
+
+  @Override
   public SelectionVector2 getSelectionVector2() {
     throw new UnsupportedOperationException();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java
deleted file mode 100644
index 85f598f..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-
-package org.apache.drill.exec.physical.impl.filter;
-
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.ValueVector;
-
-public class ExampleFilter implements RecordBatch {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExampleFilter.class);
-
-  //private EvalutationPredicates []
-  private RecordBatch incoming;
-  private BatchSchema outboundSchema;
-  private int recordCount;
-
-  private void reconfigureSchema() throws SchemaChangeException {
-    BatchSchema in = incoming.getSchema();
-    outboundSchema = BatchSchema.newBuilder().addFields(in).setSelectionVectorMode(BatchSchema.SelectionVectorMode.TWO_BYTE).build();
-  }
-
-  private int generateSelectionVector(){
-                    return -1;
-  }
-
-  @Override
-  public FragmentContext getContext() {
-    return incoming.getContext();
-  }
-
-  @Override
-  public BatchSchema getSchema() {
-    return outboundSchema;
-  }
-
-  @Override
-  public int getRecordCount() {
-    return recordCount;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public void kill() {
-    //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public SelectionVector2 getSelectionVector2() {
-    return null;
-  }
-
-  @Override
-  public SelectionVector4 getSelectionVector4() {
-    return null;
-  }
-
-  @Override
-  public TypedFieldId getValueVectorId(SchemaPath path) {
-    return null;
-  }
-
-  @Override
-  public <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> vvClass) {
-    return null;
-  }
-
-  @Override
-  public IterOutcome next() {
-    IterOutcome out = incoming.next();
-    switch (incoming.next()) {
-
-      case NONE:
-        return IterOutcome.NONE;
-      case OK_NEW_SCHEMA:
-        //reconfigureSchema();
-      case OK:
-        this.recordCount = generateSelectionVector();
-        return out;
-      case STOP:
-        return IterOutcome.STOP;
-      default:
-        throw new UnsupportedOperationException();
-    }
-  }
-
-  @Override
-  public WritableBatch getWritableBatch() {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
new file mode 100644
index 0000000..df2518b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
@@ -0,0 +1,23 @@
+package org.apache.drill.exec.physical.impl.filter;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.base.Preconditions;
+
+public class FilterBatchCreator implements BatchCreator<Filter>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterBatchCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, Filter config, List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.size() == 1);
+    return new FilterRecordBatch(config, children.iterator().next(), context);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java
new file mode 100644
index 0000000..0fad224
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java
@@ -0,0 +1,10 @@
+package org.apache.drill.exec.physical.impl.filter;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+
+public interface FilterEvaluator {
+  public void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+  public boolean doEval(int inIndex, int outIndex);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
new file mode 100644
index 0000000..fc9dbc6
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -0,0 +1,200 @@
+package org.apache.drill.exec.physical.impl.filter;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.ValueVectorReadExpression;
+import org.apache.drill.exec.expr.ValueVectorWriteExpression;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.impl.VectorHolder;
+import org.apache.drill.exec.physical.impl.project.Projector;
+import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
+import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
+import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.SchemaBuilder;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.NonRepeatedMutator;
+import org.apache.drill.exec.vector.TypeHelper;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class FilterRecordBatch implements RecordBatch{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
+
+  private final Filter filterConfig;
+  private final RecordBatch incoming;
+  private final FragmentContext context;
+  private final SelectionVector2 sv;
+  private BatchSchema outSchema;
+  private Filterer filter;
+  private List<ValueVector> outputVectors;
+  private VectorHolder vh;
+  
+  public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context){
+    this.filterConfig = pop;
+    this.incoming = incoming;
+    this.context = context;
+    sv = new SelectionVector2(context.getAllocator());
+  }
+  
+  
+  @Override
+  public FragmentContext getContext() {
+    return context;
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    Preconditions.checkNotNull(outSchema);
+    return outSchema;
+  }
+
+  @Override
+  public int getRecordCount() {
+    return sv.getCount();
+  }
+
+  @Override
+  public void kill() {
+    incoming.kill();
+  }
+
+  
+  @Override
+  public Iterator<ValueVector> iterator() {
+    return outputVectors.iterator();
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    return sv;
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+    return vh.getValueVector(path);
+  }
+
+  @Override
+  public <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> clazz) {
+    return vh.getValueVector(fieldId, clazz);
+  }
+
+  @Override
+  public IterOutcome next() {
+    
+    IterOutcome upstream = incoming.next();
+    logger.debug("Upstream... {}", upstream);
+    switch(upstream){
+    case NONE:
+    case NOT_YET:
+    case STOP:
+      return upstream;
+    case OK_NEW_SCHEMA:
+      try{
+        filter = createNewFilterer();
+      }catch(SchemaChangeException ex){
+        incoming.kill();
+        logger.error("Failure during query", ex);
+        context.fail(ex);
+        return IterOutcome.STOP;
+      }
+      // fall through.
+    case OK:
+      int recordCount = incoming.getRecordCount();
+      sv.allocateNew(recordCount);
+      filter.filterBatch(recordCount);
+      for(ValueVector v : this.outputVectors){
+        ValueVector.Mutator m = v.getMutator();
+        if(m instanceof NonRepeatedMutator){
+          ((NonRepeatedMutator) m).setValueCount(recordCount);
+        }else{
+          throw new UnsupportedOperationException();
+        }
+      }
+      return upstream; // change if upstream changed, otherwise normal.
+    default:
+      throw new UnsupportedOperationException();
+    }
+  }
+  
+
+  private Filterer createNewFilterer() throws SchemaChangeException{
+    if(outputVectors != null){
+      for(ValueVector v : outputVectors){
+        v.close();
+      }
+    }
+    this.outputVectors = Lists.newArrayList();
+    this.vh = new VectorHolder(outputVectors);
+    LogicalExpression filterExpression = filterConfig.getExpr();
+    final ErrorCollector collector = new ErrorCollectorImpl();
+    final List<TransferPair> transfers = Lists.newArrayList();
+    final CodeGenerator<Filterer> cg = new CodeGenerator<Filterer>(Filterer.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+    
+    final LogicalExpression expr = ExpressionTreeMaterializer.materialize(filterExpression, incoming, collector);
+    if(collector.hasErrors()){
+      throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
+    }
+    
+    cg.addExpr(new ReturnValueExpression(expr));
+    
+    for(ValueVector v : incoming){
+      TransferPair pair = v.getTransferPair();
+      outputVectors.add(pair.getTo());
+      transfers.add(pair);
+    }
+    
+    SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(SelectionVectorMode.TWO_BYTE);
+    for(ValueVector v : outputVectors){
+      bldr.addField(v.getField());
+    }
+    this.outSchema = bldr.build();
+    
+    try {
+      TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]);
+      Filterer filterer = context.getImplementationClass(cg);
+      filterer.setup(context, incoming, this, tx);
+      return filterer;
+    } catch (ClassTransformationException | IOException e) {
+      throw new SchemaChangeException("Failure while attempting to load generated class", e);
+    }
+  }
+  
+  @Override
+  public WritableBatch getWritableBatch() {
+    return WritableBatch.get(sv.getCount(), outputVectors);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
index 216bfec..4092911 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
@@ -1,18 +1,27 @@
 package org.apache.drill.exec.physical.impl.filter;
 
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 
-public abstract class FilterTemplate {
+public abstract class FilterTemplate implements Filterer{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterTemplate.class);
   
-  SelectionVector2 outgoingSelectionVector;
-  SelectionVector2 incomingSelectionVector;
+  private SelectionVector2 outgoingSelectionVector;
+  private SelectionVector2 incomingSelectionVector;
+  private SelectionVectorMode svMode;
+  private TransferPair[] transfers;
   
-  public void setup(RecordBatch incoming, RecordBatch outgoing){
-    outgoingSelectionVector = outgoing.getSelectionVector2();
-
-    switch(incoming.getSchema().getSelectionVector()){
+  @Override
+  public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException{
+    this.transfers = transfers;
+    this.outgoingSelectionVector = outgoing.getSelectionVector2();
+    this.svMode = incoming.getSchema().getSelectionVector();
+    
+    switch(svMode){
     case NONE:
       break;
     case TWO_BYTE:
@@ -21,28 +30,54 @@ public abstract class FilterTemplate {
     default:
       throw new UnsupportedOperationException();
     }
+    doSetup(context, incoming, outgoing);
+  }
+
+  private void doTransfers(){
+    for(TransferPair t : transfers){
+      t.transfer();
+    }
   }
   
-  public void filterBatchSV2(int recordCount){
+  public void filterBatch(int recordCount){
+    doTransfers();
+    switch(svMode){
+    case NONE:
+      filterBatchNoSV(recordCount);
+      break;
+    case TWO_BYTE:
+      filterBatchSV2(recordCount);
+      break;
+    default:
+      throw new UnsupportedOperationException();
+    }
+  }
+  
+  private void filterBatchSV2(int recordCount){
     int svIndex = 0;
-    for(char i =0; i < recordCount; i++){
-      if(include(i)){
-        outgoingSelectionVector.setIndex(svIndex, i);
+    final int count = recordCount*2;
+    for(int i = 0; i < count; i+=2){
+      char index = incomingSelectionVector.getIndex(i);
+      if(doEval(i, 0)){
+        outgoingSelectionVector.setIndex(svIndex, index);
         svIndex+=2;
       }
     }
+    outgoingSelectionVector.setRecordCount(svIndex/2);
   }
   
-  public void filterBatchNoSV(int recordCount){
+  private void filterBatchNoSV(int recordCount){
     int svIndex = 0;
     for(char i =0; i < recordCount; i++){
       
-      if(include(i)){
+      if(doEval(i, 0)){
         outgoingSelectionVector.setIndex(svIndex, i);
         svIndex+=2;
       }
     }
+    outgoingSelectionVector.setRecordCount(svIndex/2);
   }
   
-  protected abstract boolean include(int index);
+  protected abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+  protected abstract boolean doEval(int inIndex, int outIndex);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
new file mode 100644
index 0000000..b270869
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
@@ -0,0 +1,19 @@
+package org.apache.drill.exec.physical.impl.filter;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.project.Projector;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+
+public interface Filterer {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Filterer.class);
+  
+  public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException;
+  public void filterBatch(int recordCount);
+  
+  public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION = new TemplateClassDefinition<Filterer>( //
+      Filterer.class, "org.apache.drill.exec.physical.impl.filter.FilterTemplate", FilterEvaluator.class, boolean.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java
new file mode 100644
index 0000000..a794d63
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java
@@ -0,0 +1,39 @@
+package org.apache.drill.exec.physical.impl.filter;
+
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.visitors.ExprVisitor;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+
+public class ReturnValueExpression implements LogicalExpression{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReturnValueExpression.class);
+
+  private LogicalExpression child;
+  
+  public ReturnValueExpression(LogicalExpression child) {
+    this.child = child;
+  }
+
+  public LogicalExpression getChild() {
+    return child;
+  }
+
+  @Override
+  public MajorType getMajorType() {
+    return Types.NULL;
+  }
+
+  @Override
+  public <T, V, E extends Exception> T accept(ExprVisitor<T, V, E> visitor, V value) throws E {
+    return visitor.visitUnknown(this, value);
+  }
+
+  @Override
+  public ExpressionPosition getPosition() {
+    return ExpressionPosition.UNKNOWN;
+  }
+  
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/SelectionVectorPopulationExpression.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/SelectionVectorPopulationExpression.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/SelectionVectorPopulationExpression.java
deleted file mode 100644
index f253695..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/SelectionVectorPopulationExpression.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package org.apache.drill.exec.physical.impl.filter;
-
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.visitors.ExprVisitor;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-
-public class SelectionVectorPopulationExpression implements LogicalExpression{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVectorPopulationExpression.class);
-
-  private LogicalExpression child;
-  
-  public SelectionVectorPopulationExpression(LogicalExpression child) {
-    this.child = child;
-  }
-
-  public LogicalExpression getChild() {
-    return child;
-  }
-
-  @Override
-  public MajorType getMajorType() {
-    return Types.NULL;
-  }
-
-  @Override
-  public <T, V, E extends Exception> T accept(ExprVisitor<T, V, E> visitor, V value) throws E {
-    return visitor.visitUnknown(this, value);
-  }
-
-  @Override
-  public ExpressionPosition getPosition() {
-    return ExpressionPosition.UNKNOWN;
-  }
-  
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
index 86caf28..5fd1fb4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
@@ -7,6 +7,6 @@ import org.apache.drill.exec.record.RecordBatch;
 public interface ProjectEvaluator {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectEvaluator.class);
   
-  public abstract void setupEvaluators(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
-  public abstract void doPerRecordWork(int inIndex, int outIndex);
+  public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+  public abstract void doEval(int inIndex, int outIndex);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 3d1e3f7..060cd92 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -1,6 +1,7 @@
 package org.apache.drill.exec.physical.impl.project;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.common.expression.ErrorCollector;
@@ -59,7 +60,11 @@ public class ProjectRecordBatch implements RecordBatch{
     this.context = context;
   }
   
-  
+  @Override
+  public Iterator<ValueVector> iterator() {
+    return outputVectors.iterator();
+  }
+
   @Override
   public FragmentContext getContext() {
     return context;
@@ -180,7 +185,7 @@ public class ProjectRecordBatch implements RecordBatch{
         allocationVectors.add(vector);
         outputVectors.add(vector);
         ValueVectorWriteExpression write = new ValueVectorWriteExpression(outputVectors.size() - 1, expr);
-        cg.addNextWrite(write);
+        cg.addExpr(write);
       }
       
     }
@@ -192,7 +197,7 @@ public class ProjectRecordBatch implements RecordBatch{
     this.outSchema = bldr.build();
     
     try {
-      Projector projector = context.getImplementationClass(Projector.TEMPLATE_DEFINITION, cg);
+      Projector projector = context.getImplementationClass(cg);
       projector.setup(context, incoming, this, transfers);
       return projector;
     } catch (ClassTransformationException | IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
index 2787f0c..0d1e201 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
@@ -16,6 +16,6 @@ public interface Projector {
   public abstract int projectRecords(int recordCount, int firstOutputIndex);
 
   public static TemplateClassDefinition<Projector> TEMPLATE_DEFINITION = new TemplateClassDefinition<Projector>( //
-      Projector.class, "org.apache.drill.exec.physical.impl.project.ProjectorTemplate", ProjectEvaluator.class, "setupEvaluators", "doPerRecordWork");
+      Projector.class, "org.apache.drill.exec.physical.impl.project.ProjectorTemplate", ProjectEvaluator.class, null);
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index 486c7b0..735d355 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -33,7 +33,7 @@ public abstract class ProjectorTemplate implements Projector {
     case TWO_BYTE:
       final int count = recordCount*2;
       for(int i = 0; i < count; i+=2, firstOutputIndex++){
-        doPerRecordWork(vector2.getIndex(i), firstOutputIndex);
+        doEval(vector2.getIndex(i), firstOutputIndex);
       }
       return recordCount;
       
@@ -45,7 +45,7 @@ public abstract class ProjectorTemplate implements Projector {
       }
       final int countN = recordCount;
       for (int i = 0; i < countN; i++, firstOutputIndex++) {
-        doPerRecordWork(i, firstOutputIndex);
+        doEval(i, firstOutputIndex);
       }
       return recordCount;
       
@@ -68,11 +68,11 @@ public abstract class ProjectorTemplate implements Projector {
       break;
     }
     this.transfers = ImmutableList.copyOf(transfers);
-    setupEvaluators(context, incoming, outgoing);
+    setupEval(context, incoming, outgoing);
   }
 
-  protected abstract void setupEvaluators(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
-  protected abstract void doPerRecordWork(int inIndex, int outIndex);
+  protected abstract void setupEval(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+  protected abstract void doEval(int inIndex, int outIndex);
 
   
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 650a148..ff856d4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -29,7 +29,7 @@ import org.apache.drill.exec.vector.ValueVector;
  * composed of ValueVectors, ideally a batch fits within L2 cache (~256k per core). The set of value vectors do not
  * change unless the next() IterOutcome is a *_NEW_SCHEMA type.
  */
-public interface RecordBatch {
+public interface RecordBatch extends Iterable<ValueVector>{
 
   /**
    * Describes the outcome of a RecordBatch being incremented forward.
@@ -88,6 +88,7 @@ public interface RecordBatch {
   public abstract TypedFieldId getValueVectorId(SchemaPath path);
   
   
+  
   public abstract <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> clazz);
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index e2a1648..5f7648b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -33,9 +33,9 @@ import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
 import org.apache.drill.exec.vector.TypeHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
-import com.beust.jcommander.internal.Lists;
-import com.beust.jcommander.internal.Maps;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 public class RecordBatchLoader implements Iterable<ValueVector>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchLoader.class);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
index 0989c1d..34e4043 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
@@ -23,8 +23,8 @@ import java.util.Set;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
-import com.beust.jcommander.internal.Sets;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * A reusable builder that supports the creation of BatchSchemas. Can have a supporting expected object. If the expected Schema object is defined, the

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index cdc136e..88f0c79 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -19,16 +19,20 @@ package org.apache.drill.exec.record.selection;
 
 import io.netty.buffer.ByteBuf;
 
+import java.io.Closeable;
+import java.io.IOException;
+
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.DeadBuf;
 
 /**
  * A selection vector that fronts, at most, a
  */
-public class SelectionVector2{
+public class SelectionVector2 implements Closeable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector2.class);
 
   private final BufferAllocator allocator;
+  private int recordCount;
   private ByteBuf buffer = DeadBuf.DEAD_BUFFER;
 
   public SelectionVector2(BufferAllocator allocator) {
@@ -36,14 +40,40 @@ public class SelectionVector2{
   }
 
   public int getCount(){
-    return -1;
+    return recordCount;
   }
 
-  public int getIndex(int directIndex){
+  public char getIndex(int directIndex){
     return buffer.getChar(directIndex);
   }
 
   public void setIndex(int directIndex, char value){
     buffer.setChar(directIndex, value);
   }
+  
+  public void allocateNew(int size){
+    clear();
+    buffer = allocator.buffer(size * 2);
+  }
+  
+  
+  public void clear() {
+    if (buffer != DeadBuf.DEAD_BUFFER) {
+      buffer.release();
+      buffer = DeadBuf.DEAD_BUFFER;
+      recordCount = 0;
+    }
+  }
+  
+  public void setRecordCount(int recordCount){
+    logger.debug("Seting record count to {}", recordCount);
+    this.recordCount = recordCount;
+  }
+
+  @Override
+  public void close() throws IOException {
+    clear();
+  }
+  
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
index 8513dfe..3a57410 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
@@ -39,13 +39,14 @@ import org.apache.drill.exec.vector.NullableVarChar4Vector;
 import org.apache.drill.exec.vector.TypeHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
-import com.beust.jcommander.internal.Maps;
+
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
 import com.google.common.base.Charsets;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.io.Files;
 import com.google.common.io.InputSupplier;
 import com.google.common.io.Resources;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
index cb1e1d6..d2889ed 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
@@ -40,7 +40,7 @@ public class TestClassTransformation {
 
     TemplateClassDefinition<ExampleExternalInterface> def = new TemplateClassDefinition<ExampleExternalInterface>(
         ExampleExternalInterface.class, "org.apache.drill.exec.compile.ExampleTemplate",
-        ExampleInternalInterface.class, "a", "b");
+        ExampleInternalInterface.class, null);
     
     
     ClassTransformer ct = new ClassTransformer();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
index 623af0e..c610374 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
@@ -92,7 +92,7 @@ public class ExpressionTest {
     }
 
     CodeGenerator<Projector> cg = new CodeGenerator<Projector>(Projector.TEMPLATE_DEFINITION, new FunctionImplementationRegistry(DrillConfig.create()));
-    cg.addNextWrite(new ValueVectorWriteExpression(-1, materializedExpr));
+    cg.addExpr(new ValueVectorWriteExpression(-1, materializedExpr));
     return cg.generate();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index d125ec0..c6434f7 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -5,6 +5,7 @@ import org.apache.drill.exec.physical.impl.ScreenCreator.ScreenRoot;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
+import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.vector.ValueVector;
 
 public class SimpleRootExec implements RootExec{
@@ -21,6 +22,9 @@ public class SimpleRootExec implements RootExec{
     
   }
 
+  public SelectionVector2 getSelectionVector2(){
+    return incoming.getSelectionVector2();
+  }
 
   public <T extends ValueVector> T getValueVectorById(SchemaPath path, Class<?> vvClass){
     TypedFieldId tfid = incoming.getValueVectorId(path);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
new file mode 100644
index 0000000..df11aa7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
@@ -0,0 +1,58 @@
+package org.apache.drill.exec.physical.impl.filter;
+
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.SimpleRootExec;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.junit.After;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.yammer.metrics.MetricRegistry;
+
+public class TestSimpleFilter {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSimpleFilter.class);
+  DrillConfig c = DrillConfig.create();
+  
+  
+  @Test
+  public void project(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Exception{
+    System.out.println(System.getProperty("java.class.path"));
+
+    
+    new NonStrictExpectations(){{
+      bitContext.getMetrics(); result = new MetricRegistry("test");
+      bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+    }};
+    
+    
+    PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+    PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/filter/test1.json"), Charsets.UTF_8));
+    FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+    FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+    SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+    while(exec.next()){
+      System.out.println(exec.getSelectionVector2().getCount());
+    }
+  }
+  
+  @After
+  public void tearDown() throws Exception{
+    // pause to get logger to catch up.
+    Thread.sleep(1000);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
index 925faf7..68b8881 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
@@ -31,7 +31,7 @@ import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
 import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
 import org.junit.Test;
 
-import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 
 public class ExpressionTreeMaterializerTest {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
index 7c9e8f4..4a0358e 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
@@ -24,7 +24,8 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.Lists;
+
 
 public class JSONRecordReaderTest {
   private static final Charset UTF_8 = Charset.forName("UTF-8");

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json b/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json
new file mode 100644
index 0000000..a892c70
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json
@@ -0,0 +1,34 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+	graph:[
+        {
+            @id:1,
+            pop:"mock-scan",
+            url: "http://apache.org",
+            entries:[
+            	{records: 100, types: [
+            	  {name: "blue", type: "INT", mode: "REQUIRED"},
+            	  {name: "red", type: "BIGINT", mode: "REQUIRED"},
+            	  {name: "green", type: "INT", mode: "REQUIRED"}
+            	]}
+            ]
+        },
+        {
+            @id:2,
+            child: 1,
+            pop:"filter",
+            expr: "true"
+        },
+        {
+            @id: 3,
+            child: 2,
+            pop: "screen"
+        }
+    ]
+}
\ No newline at end of file


Re: [51/53] [abbrv] git commit: Initial working filter operator

Posted by Ted Dunning <te...@gmail.com>.
On Fri, Jul 19, 2013 at 6:58 PM, <ja...@apache.org> wrote:

> Initial working filter operator
>
>
> Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
> Commit:
> http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/65e2cfce
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/65e2cfce
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/65e2cfce
>
> Branch: refs/heads/master
> Commit: 65e2cfce5364ccfe30b83b9dddeb304b79efbc76
> Parents: 57de7ed
> Author: Jacques Nadeau <ja...@apache.org>
> Authored: Wed Jul 17 00:27:31 2013 -0700
> Committer: Jacques Nadeau <ja...@apache.org>
> Committed: Fri Jul 19 14:53:31 2013 -0700
>
> ----------------------------------------------------------------------
>  .../org/apache/drill/exec/cache/HazelCache.java |   1 -
>  .../drill/exec/compile/ClassTransformer.java    |  17 +-
>  .../drill/exec/compile/JDKClassCompiler.java    |   5 +-
>  .../exec/compile/TemplateClassDefinition.java   |  24 +--
>  .../apache/drill/exec/expr/CodeGenerator.java   |  20 +-
>  .../drill/exec/expr/EvaluationVisitor.java      |  22 +-
>  .../drill/exec/expr/fn/FunctionConverter.java   |   2 +-
>  .../exec/expr/fn/MethodGrabbingVisitor.java     |   3 +-
>  .../apache/drill/exec/ops/FragmentContext.java  |   9 +-
>  .../drill/exec/physical/impl/ImplCreator.java   |  10 +-
>  .../drill/exec/physical/impl/ScanBatch.java     |  11 +-
>  .../exec/physical/impl/WireRecordBatch.java     |   8 +-
>  .../physical/impl/filter/ExampleFilter.java     | 111 ----------
>  .../impl/filter/FilterBatchCreator.java         |  23 +++
>  .../physical/impl/filter/FilterEvaluator.java   |  10 +
>  .../physical/impl/filter/FilterRecordBatch.java | 200 +++++++++++++++++++
>  .../physical/impl/filter/FilterTemplate.java    |  63 ++++--
>  .../exec/physical/impl/filter/Filterer.java     |  19 ++
>  .../impl/filter/ReturnValueExpression.java      |  39 ++++
>  .../SelectionVectorPopulationExpression.java    |  39 ----
>  .../physical/impl/project/ProjectEvaluator.java |   4 +-
>  .../impl/project/ProjectRecordBatch.java        |  11 +-
>  .../exec/physical/impl/project/Projector.java   |   2 +-
>  .../impl/project/ProjectorTemplate.java         |  10 +-
>  .../apache/drill/exec/record/RecordBatch.java   |   3 +-
>  .../drill/exec/record/RecordBatchLoader.java    |   4 +-
>  .../apache/drill/exec/record/SchemaBuilder.java |   2 +-
>  .../exec/record/selection/SelectionVector2.java |  36 +++-
>  .../drill/exec/store/JSONRecordReader.java      |   3 +-
>  .../exec/compile/TestClassTransformation.java   |   2 +-
>  .../apache/drill/exec/expr/ExpressionTest.java  |   2 +-
>  .../exec/physical/impl/SimpleRootExec.java      |   4 +
>  .../physical/impl/filter/TestSimpleFilter.java  |  58 ++++++
>  .../record/ExpressionTreeMaterializerTest.java  |   2 +-
>  .../drill/exec/store/JSONRecordReaderTest.java  |   3 +-
>  .../src/test/resources/filter/test1.json        |  34 ++++
>  36 files changed, 566 insertions(+), 250 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
> index f4fdbfa..fe4a212 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
> @@ -29,7 +29,6 @@ import
> org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
>  import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
>  import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
>
> -import com.beust.jcommander.internal.Lists;
>  import com.google.common.cache.Cache;
>  import com.google.common.cache.CacheBuilder;
>  import com.google.protobuf.InvalidProtocolBufferException;
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
> index 4bf6e7e..d7cde2a 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
> @@ -44,11 +44,11 @@ import org.objectweb.asm.tree.ClassNode;
>  import org.objectweb.asm.tree.FieldNode;
>  import org.objectweb.asm.tree.MethodNode;
>
> -import com.beust.jcommander.internal.Sets;
>  import com.google.common.base.Preconditions;
>  import com.google.common.cache.CacheBuilder;
>  import com.google.common.cache.CacheLoader;
>  import com.google.common.cache.LoadingCache;
> +import com.google.common.collect.Sets;
>  import com.google.common.io.Files;
>  import com.google.common.io.Resources;
>
> @@ -112,15 +112,16 @@ public class ClassTransformer {
>        String materializedClassName) throws ClassTransformationException {
>
>      try {
> -
> +      long t0 = System.nanoTime();
>        final byte[] implementationClass =
> classLoader.getClassByteCode(materializedClassName, entireClass);
>
>        // Get Template Class
>        final String templateClassName =
> templateDefinition.getTemplateClassName().replaceAll("\\.", File.separator);
>        final String templateClassPath = File.separator + templateClassName
> + ".class";
> +      long t1 = System.nanoTime();
>        final byte[] templateClass =
> getClassByteCodeFromPath(templateClassPath);
> -      int fileNum = new Random().nextInt(100);
> -      Files.write(templateClass, new
> File(String.format("/tmp/%d-template.class", fileNum)));
> +//      int fileNum = new Random().nextInt(100);
> +      //Files.write(templateClass, new
> File(String.format("/tmp/%d-template.class", fileNum)));
>        // Generate Merge Class
>
>        // Setup adapters for merging, remapping class names and class
> writing. This is done in reverse order of how they
> @@ -130,7 +131,11 @@ public class ClassTransformer {
>        RemapClasses remapper = new RemapClasses(oldTemplateSlashName,
> materializedSlashName);
>
>        {
> +
>          ClassNode impl = getClassNodeFromByteCode(implementationClass);
> +        long t2 = System.nanoTime();
> +        logger.debug("Compile {}, decode template {}", (t1 -
> t0)/1000/1000, (t2- t1)/1000/1000);
> +
>          ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES);
>
>          ClassVisitor remappingAdapter = new RemappingClassAdapter(cw,
> remapper);
> @@ -139,7 +144,7 @@ public class ClassTransformer {
>          ClassReader tReader = new ClassReader(templateClass);
>          tReader.accept(mergingAdapter, ClassReader.EXPAND_FRAMES);
>          byte[] outputClass = cw.toByteArray();
> -        Files.write(outputClass, new
> File(String.format("/tmp/%d-output.class", fileNum)));
> +//        Files.write(outputClass, new
> File(String.format("/tmp/%d-output.class", fileNum)));
>          outputClass = cw.toByteArray();
>
>          // Load the class
> @@ -160,7 +165,7 @@ public class ClassTransformer {
>          reader.accept(remap, ClassReader.EXPAND_FRAMES);
>          byte[] newByteCode = subcw.toByteArray();
>          classLoader.injectByteCode(s.replace(oldTemplateSlashName,
> materializedSlashName).replace('/', '.'), newByteCode);
> -        Files.write(subcw.toByteArray(), new
> File(String.format("/tmp/%d-sub-%d.class", fileNum, i)));
> +//        Files.write(subcw.toByteArray(), new
> File(String.format("/tmp/%d-sub-%d.class", fileNum, i)));
>          i++;
>        }
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JDKClassCompiler.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JDKClassCompiler.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JDKClassCompiler.java
> index 15e87fe..8f6e572 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JDKClassCompiler.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JDKClassCompiler.java
> @@ -30,9 +30,9 @@ import java.util.List;
>  import javax.tools.Diagnostic;
>  import javax.tools.DiagnosticListener;
>  import javax.tools.JavaCompiler;
> +import javax.tools.JavaCompiler.CompilationTask;
>  import javax.tools.JavaFileManager;
>  import javax.tools.JavaFileObject;
> -import javax.tools.JavaCompiler.CompilationTask;
>  import javax.tools.JavaFileObject.Kind;
>  import javax.tools.SimpleJavaFileObject;
>  import javax.tools.StandardLocation;
> @@ -43,7 +43,8 @@ import org.codehaus.commons.compiler.Location;
>  import org.codehaus.commons.compiler.jdk.ByteArrayJavaFileManager;
>  import
> org.codehaus.commons.compiler.jdk.ByteArrayJavaFileManager.ByteArrayJavaFileObject;
>
> -import com.beust.jcommander.internal.Lists;
> +import com.google.common.collect.Lists;
> +
>
>  class JDKClassCompiler implements ClassCompiler {
>    static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(JDKClassCompiler.class);
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
> index 5a01dce..20ef361 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
> @@ -17,7 +17,6 @@
>
> ******************************************************************************/
>  package org.apache.drill.exec.compile;
>
> -import java.lang.reflect.Method;
>
>
>  public class TemplateClassDefinition<T>{
> @@ -25,24 +24,19 @@ public class TemplateClassDefinition<T>{
>    private final Class<T> externalInterface;
>    private final String templateClassName;
>    private final Class<?> internalInterface;
> -  private final String setupName;
> -  private final String evalName;
> -
> +  private final Class<?> evalReturnType;
>
> -  public TemplateClassDefinition(Class<T> externalInterface, String
> templateClassName, Class<?> internalInterface, String setupName, String
> evalName) {
> +  public TemplateClassDefinition(Class<T> externalInterface, String
> templateClassName, Class<?> internalInterface, Class<?> evalReturnType) {
>      super();
>      this.externalInterface = externalInterface;
>      this.templateClassName = templateClassName;
>      this.internalInterface = internalInterface;
> -    this.setupName = setupName;
> -    this.evalName = evalName;
> +    this.evalReturnType = evalReturnType;
>    }
>
> -
>    public Class<T> getExternalInterface() {
>      return externalInterface;
>    }
> -
>
>    public Class<?> getInternalInterface() {
>      return internalInterface;
> @@ -52,16 +46,8 @@ public class TemplateClassDefinition<T>{
>      return templateClassName;
>    }
>
> -  public String getSetupName() {
> -    return setupName;
> +  public Class<?> getEvalReturnType() {
> +    return evalReturnType;
>    }
> -
> -
> -  public String getEvalName() {
> -    return evalName;
> -  }
> -
> -
> -
>
>  }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
> index ed6bd9b..241c1cc 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
> @@ -1,10 +1,13 @@
>  package org.apache.drill.exec.expr;
>
>  import java.io.IOException;
> +import java.lang.reflect.Method;
>
> +import org.apache.drill.common.expression.LogicalExpression;
>  import org.apache.drill.common.types.TypeProtos.DataMode;
>  import org.apache.drill.common.types.TypeProtos.MajorType;
>  import org.apache.drill.exec.compile.TemplateClassDefinition;
> +import org.apache.drill.exec.exception.SchemaChangeException;
>  import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
>  import org.apache.drill.exec.expr.holders.BooleanHolder;
>  import org.apache.drill.exec.expr.holders.IntHolder;
> @@ -36,8 +39,6 @@ public class CodeGenerator<T> {
>    private JBlock currentEvalBlock;
>    private JBlock currentSetupBlock;
>    private final EvaluationVisitor evaluationVisitor;
> -  private final String setupName;
> -  private final String perRecordName;
>    private final TemplateClassDefinition<T> definition;
>    private JCodeModel model;
>    private int index = 0;
> @@ -46,8 +47,6 @@ public class CodeGenerator<T> {
>      super();
>      try{
>        this.definition = definition;
> -      this.setupName = definition.getSetupName();
> -      this.perRecordName = definition.getEvalName();
>        this.model = new JCodeModel();
>        this.clazz =
> model._package("org.apache.drill.exec.test.generated")._class("Test1");
>        clazz._implements(definition.getInternalInterface());
> @@ -59,7 +58,7 @@ public class CodeGenerator<T> {
>      }
>    }
>
> -  public void addNextWrite(ValueVectorWriteExpression ex){
> +  public void addExpr(LogicalExpression ex){
>      logger.debug("Adding next write {}", ex);
>      currentEvalBlock = new JBlock();
>      parentEvalBlock.add(currentEvalBlock);
> @@ -80,20 +79,27 @@ public class CodeGenerator<T> {
>      return currentSetupBlock;
>    }
>
> +
> +  public TemplateClassDefinition<T> getDefinition() {
> +    return definition;
> +  }
> +
>    public String generate() throws IOException{
>
>      {
>        //setup method
> -      JMethod m = clazz.method(JMod.PUBLIC, model.VOID, this.setupName);
> +      JMethod m = clazz.method(JMod.PUBLIC, model.VOID, "doSetup");
>        m.param(model._ref(FragmentContext.class), "context");
>        m.param(model._ref(RecordBatch.class), "incoming");
>        m.param(model._ref(RecordBatch.class), "outgoing");
> +      m._throws(SchemaChangeException.class);
>        m.body().add(parentSetupBlock);
>      }
>
>      {
>        // eval method.
> -      JMethod m = clazz.method(JMod.PUBLIC, model.VOID,
> this.perRecordName);
> +      JType ret = definition.getEvalReturnType() == null ? model.VOID :
> model._ref(definition.getEvalReturnType());
> +      JMethod m = clazz.method(JMod.PUBLIC, ret, "doEval");
>        m.param(model.INT, "inIndex");
>        m.param(model.INT, "outIndex");
>        m.body().add(parentEvalBlock);
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
> index 6b0e499..c219d9c 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
> @@ -14,7 +14,7 @@ import org.apache.drill.common.types.Types;
>  import org.apache.drill.exec.expr.CodeGenerator.HoldingContainer;
>  import org.apache.drill.exec.expr.fn.FunctionHolder;
>  import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
> -import
> org.apache.drill.exec.physical.impl.filter.SelectionVectorPopulationExpression;
> +import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
>  import org.apache.drill.exec.record.selection.SelectionVector2;
>  import org.apache.drill.exec.vector.TypeHelper;
>
> @@ -117,7 +117,7 @@ public class EvaluationVisitor extends
> AbstractExprVisitor<HoldingContainer, Cod
>    @Override
>    public HoldingContainer visitBooleanConstant(BooleanExpression e,
> CodeGenerator<?> generator) throws RuntimeException {
>      HoldingContainer out = generator.declare(e.getMajorType());
> -    generator.getBlock().assign(out.getValue(),
> JExpr.lit(e.getBoolean()));
> +    generator.getBlock().assign(out.getValue(), JExpr.lit(e.getBoolean()
> ? 1 : 0));
>      return out;
>    }
>
> @@ -127,8 +127,8 @@ public class EvaluationVisitor extends
> AbstractExprVisitor<HoldingContainer, Cod
>        return visitValueVectorExpression((ValueVectorReadExpression) e,
> generator);
>      }else if(e instanceof ValueVectorWriteExpression){
>        return visitValueVectorWriteExpression((ValueVectorWriteExpression)
> e, generator);
> -    }else if(e instanceof SelectionVectorPopulationExpression){
> -      return
> visitSelectionVectorExpression((SelectionVectorPopulationExpression) e,
> generator);
> +    }else if(e instanceof ReturnValueExpression){
> +      return visitReturnValueExpression((ReturnValueExpression) e,
> generator);
>      }else{
>        return super.visitUnknown(e, generator);
>      }
> @@ -196,21 +196,11 @@ public class EvaluationVisitor extends
> AbstractExprVisitor<HoldingContainer, Cod
>    }
>
>
> -  private HoldingContainer
> visitSelectionVectorExpression(SelectionVectorPopulationExpression e,
> CodeGenerator<?> generator){
> -    JType svClass = generator.getModel()._ref(SelectionVector2.class);
> -    JVar sv = generator.declareClassField("sv", svClass);
> -    JVar index = generator.declareClassField("svIndex",
> generator.getModel().CHAR);
> +  private HoldingContainer
> visitReturnValueExpression(ReturnValueExpression e, CodeGenerator<?>
> generator){
>      LogicalExpression child = e.getChild();
>
>  Preconditions.checkArgument(child.getMajorType().equals(Types.REQUIRED_BOOLEAN));
>      HoldingContainer hc = child.accept(this, generator);
> -    generator.getBlock()._return(hc.getValue());
> -
> -//    JBlock blk = generator.getSetupBlock();
> -//    blk.assign(sv,
> JExpr.direct("outgoing").invoke("getSelectionVector2"));
> -//    JConditional jc = blk._if(hc.getValue());
> -//    JBlock body = jc._then();
> -//    body.add(sv.invoke("set").arg(index).arg(JExpr.direct("inIndex")));
> -//    body.assign(index, index.plus(JExpr.lit(1)));
> +    generator.getBlock()._return(hc.getValue().eq(JExpr.lit(1)));
>      return null;
>    }
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
> index 84f04f0..8e0f1be 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
> @@ -25,7 +25,7 @@ import org.codehaus.janino.Parser;
>  import org.codehaus.janino.Scanner;
>  import org.mortbay.util.IO;
>
> -import com.beust.jcommander.internal.Lists;
> +import com.google.common.collect.Lists;
>  import com.google.common.io.InputSupplier;
>  import com.google.common.io.Resources;
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/MethodGrabbingVisitor.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/MethodGrabbingVisitor.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/MethodGrabbingVisitor.java
> index 22b9046..57268ee 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/MethodGrabbingVisitor.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/MethodGrabbingVisitor.java
> @@ -8,7 +8,8 @@ import org.codehaus.janino.Java.ClassDeclaration;
>  import org.codehaus.janino.Java.MethodDeclarator;
>  import org.codehaus.janino.util.Traverser;
>
> -import com.beust.jcommander.internal.Maps;
> +import com.google.common.collect.Maps;
> +
>
>  public class MethodGrabbingVisitor{
>    static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(MethodGrabbingVisitor.class);
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
> index 876b873..2ae4afa 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
> @@ -103,11 +103,14 @@ public class FragmentContext {
>      return context.getAllocator();
>    }
>
> -  public <T> T getImplementationClass(TemplateClassDefinition<T>
> templateDefinition, CodeGenerator<T> cg) throws
> ClassTransformationException, IOException{
> -    return transformer.getImplementationClass(this.loader,
> templateDefinition, cg.generate(), cg.getMaterializedClassName());
> +  public <T> T getImplementationClass(CodeGenerator<T> cg) throws
> ClassTransformationException, IOException{
> +    long t1 = System.nanoTime();
> +    T t= transformer.getImplementationClass(this.loader,
> cg.getDefinition(), cg.generate(), cg.getMaterializedClassName());
> +    System.out.println( (System.nanoTime() - t1)/1000/1000 );
> +    return t;
> +
>    }
>
> -
>    public void addMetricsToStatus(FragmentStatus.Builder stats){
>      stats.setBatchesCompleted(batchesCompleted.get());
>      stats.setDataProcessed(dataProcessed.get());
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
> index 739c0d4..f96d6f3 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
> @@ -26,12 +26,14 @@ import
> org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
>  import org.apache.drill.exec.physical.base.FragmentRoot;
>  import org.apache.drill.exec.physical.base.PhysicalOperator;
>  import org.apache.drill.exec.physical.base.Scan;
> +import org.apache.drill.exec.physical.config.Filter;
>  import org.apache.drill.exec.physical.config.MockScanBatchCreator;
>  import org.apache.drill.exec.physical.config.MockScanPOP;
>  import org.apache.drill.exec.physical.config.Project;
>  import org.apache.drill.exec.physical.config.RandomReceiver;
>  import org.apache.drill.exec.physical.config.Screen;
>  import org.apache.drill.exec.physical.config.SingleSender;
> +import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
>  import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
>  import org.apache.drill.exec.record.RecordBatch;
>
> @@ -46,6 +48,7 @@ public class ImplCreator extends
> AbstractPhysicalVisitor<RecordBatch, FragmentCo
>    private RandomReceiverCreator rrc = new RandomReceiverCreator();
>    private SingleSenderCreator ssc = new SingleSenderCreator();
>    private ProjectBatchCreator pbc = new ProjectBatchCreator();
> +  private FilterBatchCreator fbc = new FilterBatchCreator();
>    private RootExec root = null;
>
>    private ImplCreator(){}
> @@ -78,10 +81,13 @@ public class ImplCreator extends
> AbstractPhysicalVisitor<RecordBatch, FragmentCo
>      root = sc.getRoot(context, op, getChildren(op, context));
>      return null;
>    }
> -
> -
>
>    @Override
> +  public RecordBatch visitFilter(Filter filter, FragmentContext context)
> throws ExecutionSetupException {
> +    return fbc.getBatch(context, filter, getChildren(filter, context));
> +  }
> +
> +  @Override
>    public RecordBatch visitSingleSender(SingleSender op, FragmentContext
> context) throws ExecutionSetupException {
>      root = ssc.getRoot(context, op, getChildren(op, context));
>      return null;
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
> index 5688bb1..084db54 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
> @@ -26,7 +26,6 @@ import org.apache.drill.common.expression.SchemaPath;
>  import org.apache.drill.exec.exception.SchemaChangeException;
>  import org.apache.drill.exec.ops.FragmentContext;
>  import org.apache.drill.exec.record.BatchSchema;
> -import org.apache.drill.exec.record.InvalidValueAccessor;
>  import org.apache.drill.exec.record.MaterializedField;
>  import org.apache.drill.exec.record.RecordBatch;
>  import org.apache.drill.exec.record.SchemaBuilder;
> @@ -36,9 +35,8 @@ import
> org.apache.drill.exec.record.selection.SelectionVector4;
>  import org.apache.drill.exec.store.RecordReader;
>  import org.apache.drill.exec.vector.ValueVector;
>
> -import com.beust.jcommander.internal.Lists;
> -import com.beust.jcommander.internal.Maps;
> -import com.carrotsearch.hppc.procedures.IntObjectProcedure;
> +import com.google.common.collect.Lists;
> +import com.google.common.collect.Maps;
>
>  /**
>   * Record batch used for a particular scan. Operators against one or more
> @@ -171,6 +169,11 @@ public class ScanBatch implements RecordBatch {
>    }
>
>    @Override
> +  public Iterator<ValueVector> iterator() {
> +    return vectors.iterator();
> +  }
> +
> +  @Override
>    public WritableBatch getWritableBatch() {
>      return WritableBatch.get(this.getRecordCount(), vectors);
>    }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
> index d2b8bfd..a575f69 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
> @@ -17,6 +17,8 @@
>
> ******************************************************************************/
>  package org.apache.drill.exec.physical.impl;
>
> +import java.util.Iterator;
> +
>  import org.apache.drill.common.expression.SchemaPath;
>  import org.apache.drill.exec.exception.SchemaChangeException;
>  import org.apache.drill.exec.ops.FragmentContext;
> @@ -66,9 +68,13 @@ public class WireRecordBatch implements RecordBatch{
>    public void kill() {
>      fragProvider.kill(context);
>    }
> -
>
>    @Override
> +  public Iterator<ValueVector> iterator() {
> +    return batchLoader.iterator();
> +  }
> +
> +  @Override
>    public SelectionVector2 getSelectionVector2() {
>      throw new UnsupportedOperationException();
>    }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java
> deleted file mode 100644
> index 85f598f..0000000
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java
> +++ /dev/null
> @@ -1,111 +0,0 @@
>
> -/*******************************************************************************
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> -
> ******************************************************************************/
> -
> -package org.apache.drill.exec.physical.impl.filter;
> -
> -import org.apache.drill.common.expression.SchemaPath;
> -import org.apache.drill.exec.exception.SchemaChangeException;
> -import org.apache.drill.exec.ops.FragmentContext;
> -import org.apache.drill.exec.record.BatchSchema;
> -import org.apache.drill.exec.record.RecordBatch;
> -import org.apache.drill.exec.record.WritableBatch;
> -import org.apache.drill.exec.record.selection.SelectionVector2;
> -import org.apache.drill.exec.record.selection.SelectionVector4;
> -import org.apache.drill.exec.vector.ValueVector;
> -
> -public class ExampleFilter implements RecordBatch {
> -  static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(ExampleFilter.class);
> -
> -  //private EvalutationPredicates []
> -  private RecordBatch incoming;
> -  private BatchSchema outboundSchema;
> -  private int recordCount;
> -
> -  private void reconfigureSchema() throws SchemaChangeException {
> -    BatchSchema in = incoming.getSchema();
> -    outboundSchema =
> BatchSchema.newBuilder().addFields(in).setSelectionVectorMode(BatchSchema.SelectionVectorMode.TWO_BYTE).build();
> -  }
> -
> -  private int generateSelectionVector(){
> -                    return -1;
> -  }
> -
> -  @Override
> -  public FragmentContext getContext() {
> -    return incoming.getContext();
> -  }
> -
> -  @Override
> -  public BatchSchema getSchema() {
> -    return outboundSchema;
> -  }
> -
> -  @Override
> -  public int getRecordCount() {
> -    return recordCount;  //To change body of implemented methods use File
> | Settings | File Templates.
> -  }
> -
> -  @Override
> -  public void kill() {
> -    //To change body of implemented methods use File | Settings | File
> Templates.
> -  }
> -
> -  @Override
> -  public SelectionVector2 getSelectionVector2() {
> -    return null;
> -  }
> -
> -  @Override
> -  public SelectionVector4 getSelectionVector4() {
> -    return null;
> -  }
> -
> -  @Override
> -  public TypedFieldId getValueVectorId(SchemaPath path) {
> -    return null;
> -  }
> -
> -  @Override
> -  public <T extends ValueVector> T getValueVectorById(int fieldId,
> Class<?> vvClass) {
> -    return null;
> -  }
> -
> -  @Override
> -  public IterOutcome next() {
> -    IterOutcome out = incoming.next();
> -    switch (incoming.next()) {
> -
> -      case NONE:
> -        return IterOutcome.NONE;
> -      case OK_NEW_SCHEMA:
> -        //reconfigureSchema();
> -      case OK:
> -        this.recordCount = generateSelectionVector();
> -        return out;
> -      case STOP:
> -        return IterOutcome.STOP;
> -      default:
> -        throw new UnsupportedOperationException();
> -    }
> -  }
> -
> -  @Override
> -  public WritableBatch getWritableBatch() {
> -    return null;  //To change body of implemented methods use File |
> Settings | File Templates.
> -  }
> -}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
> new file mode 100644
> index 0000000..df2518b
> --- /dev/null
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
> @@ -0,0 +1,23 @@
> +package org.apache.drill.exec.physical.impl.filter;
> +
> +import java.util.List;
> +
> +import org.apache.drill.common.exceptions.ExecutionSetupException;
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.physical.config.Filter;
> +import org.apache.drill.exec.physical.impl.BatchCreator;
> +import org.apache.drill.exec.record.RecordBatch;
> +
> +import com.google.common.base.Preconditions;
> +
> +public class FilterBatchCreator implements BatchCreator<Filter>{
> +  static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(FilterBatchCreator.class);
> +
> +  @Override
> +  public RecordBatch getBatch(FragmentContext context, Filter config,
> List<RecordBatch> children) throws ExecutionSetupException {
> +    Preconditions.checkArgument(children.size() == 1);
> +    return new FilterRecordBatch(config, children.iterator().next(),
> context);
> +  }
> +
> +
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java
> new file mode 100644
> index 0000000..0fad224
> --- /dev/null
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java
> @@ -0,0 +1,10 @@
> +package org.apache.drill.exec.physical.impl.filter;
> +
> +import org.apache.drill.exec.exception.SchemaChangeException;
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.record.RecordBatch;
> +
> +public interface FilterEvaluator {
> +  public void doSetup(FragmentContext context, RecordBatch incoming,
> RecordBatch outgoing) throws SchemaChangeException;
> +  public boolean doEval(int inIndex, int outIndex);
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
> new file mode 100644
> index 0000000..fc9dbc6
> --- /dev/null
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
> @@ -0,0 +1,200 @@
> +package org.apache.drill.exec.physical.impl.filter;
> +
> +import java.io.IOException;
> +import java.util.Iterator;
> +import java.util.List;
> +
> +import org.apache.drill.common.expression.ErrorCollector;
> +import org.apache.drill.common.expression.ErrorCollectorImpl;
> +import org.apache.drill.common.expression.FieldReference;
> +import org.apache.drill.common.expression.LogicalExpression;
> +import org.apache.drill.common.expression.PathSegment;
> +import org.apache.drill.common.expression.SchemaPath;
> +import org.apache.drill.common.logical.data.NamedExpression;
> +import org.apache.drill.common.types.TypeProtos.MajorType;
> +import org.apache.drill.exec.exception.ClassTransformationException;
> +import org.apache.drill.exec.exception.SchemaChangeException;
> +import org.apache.drill.exec.expr.CodeGenerator;
> +import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
> +import org.apache.drill.exec.expr.ValueVectorReadExpression;
> +import org.apache.drill.exec.expr.ValueVectorWriteExpression;
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.physical.config.Filter;
> +import org.apache.drill.exec.physical.config.Project;
> +import org.apache.drill.exec.physical.impl.VectorHolder;
> +import org.apache.drill.exec.physical.impl.project.Projector;
> +import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
> +import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
> +import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
> +import org.apache.drill.exec.record.BatchSchema;
> +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
> +import org.apache.drill.exec.record.MaterializedField;
> +import org.apache.drill.exec.record.RecordBatch;
> +import org.apache.drill.exec.record.SchemaBuilder;
> +import org.apache.drill.exec.record.TransferPair;
> +import org.apache.drill.exec.record.WritableBatch;
> +import org.apache.drill.exec.record.selection.SelectionVector2;
> +import org.apache.drill.exec.record.selection.SelectionVector4;
> +import org.apache.drill.exec.vector.AllocationHelper;
> +import org.apache.drill.exec.vector.NonRepeatedMutator;
> +import org.apache.drill.exec.vector.TypeHelper;
> +import org.apache.drill.exec.vector.ValueVector;
> +
> +import com.google.common.base.Preconditions;
> +import com.google.common.collect.Lists;
> +
> +public class FilterRecordBatch implements RecordBatch{
> +  static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
> +
> +  private final Filter filterConfig;
> +  private final RecordBatch incoming;
> +  private final FragmentContext context;
> +  private final SelectionVector2 sv;
> +  private BatchSchema outSchema;
> +  private Filterer filter;
> +  private List<ValueVector> outputVectors;
> +  private VectorHolder vh;
> +
> +  public FilterRecordBatch(Filter pop, RecordBatch incoming,
> FragmentContext context){
> +    this.filterConfig = pop;
> +    this.incoming = incoming;
> +    this.context = context;
> +    sv = new SelectionVector2(context.getAllocator());
> +  }
> +
> +
> +  @Override
> +  public FragmentContext getContext() {
> +    return context;
> +  }
> +
> +  @Override
> +  public BatchSchema getSchema() {
> +    Preconditions.checkNotNull(outSchema);
> +    return outSchema;
> +  }
> +
> +  @Override
> +  public int getRecordCount() {
> +    return sv.getCount();
> +  }
> +
> +  @Override
> +  public void kill() {
> +    incoming.kill();
> +  }
> +
> +
> +  @Override
> +  public Iterator<ValueVector> iterator() {
> +    return outputVectors.iterator();
> +  }
> +
> +  @Override
> +  public SelectionVector2 getSelectionVector2() {
> +    return sv;
> +  }
> +
> +  @Override
> +  public SelectionVector4 getSelectionVector4() {
> +    throw new UnsupportedOperationException();
> +  }
> +
> +  @Override
> +  public TypedFieldId getValueVectorId(SchemaPath path) {
> +    return vh.getValueVector(path);
> +  }
> +
> +  @Override
> +  public <T extends ValueVector> T getValueVectorById(int fieldId,
> Class<?> clazz) {
> +    return vh.getValueVector(fieldId, clazz);
> +  }
> +
> +  @Override
> +  public IterOutcome next() {
> +
> +    IterOutcome upstream = incoming.next();
> +    logger.debug("Upstream... {}", upstream);
> +    switch(upstream){
> +    case NONE:
> +    case NOT_YET:
> +    case STOP:
> +      return upstream;
> +    case OK_NEW_SCHEMA:
> +      try{
> +        filter = createNewFilterer();
> +      }catch(SchemaChangeException ex){
> +        incoming.kill();
> +        logger.error("Failure during query", ex);
> +        context.fail(ex);
> +        return IterOutcome.STOP;
> +      }
> +      // fall through.
> +    case OK:
> +      int recordCount = incoming.getRecordCount();
> +      sv.allocateNew(recordCount);
> +      filter.filterBatch(recordCount);
> +      for(ValueVector v : this.outputVectors){
> +        ValueVector.Mutator m = v.getMutator();
> +        if(m instanceof NonRepeatedMutator){
> +          ((NonRepeatedMutator) m).setValueCount(recordCount);
> +        }else{
> +          throw new UnsupportedOperationException();
> +        }
> +      }
> +      return upstream; // change if upstream changed, otherwise normal.
> +    default:
> +      throw new UnsupportedOperationException();
> +    }
> +  }
> +
> +
> +  private Filterer createNewFilterer() throws SchemaChangeException{
> +    if(outputVectors != null){
> +      for(ValueVector v : outputVectors){
> +        v.close();
> +      }
> +    }
> +    this.outputVectors = Lists.newArrayList();
> +    this.vh = new VectorHolder(outputVectors);
> +    LogicalExpression filterExpression = filterConfig.getExpr();
> +    final ErrorCollector collector = new ErrorCollectorImpl();
> +    final List<TransferPair> transfers = Lists.newArrayList();
> +    final CodeGenerator<Filterer> cg = new
> CodeGenerator<Filterer>(Filterer.TEMPLATE_DEFINITION,
> context.getFunctionRegistry());
> +
> +    final LogicalExpression expr =
> ExpressionTreeMaterializer.materialize(filterExpression, incoming,
> collector);
> +    if(collector.hasErrors()){
> +      throw new SchemaChangeException(String.format("Failure while trying
> to materialize incoming schema.  Errors:\n %s.",
> collector.toErrorString()));
> +    }
> +
> +    cg.addExpr(new ReturnValueExpression(expr));
> +
> +    for(ValueVector v : incoming){
> +      TransferPair pair = v.getTransferPair();
> +      outputVectors.add(pair.getTo());
> +      transfers.add(pair);
> +    }
> +
> +    SchemaBuilder bldr =
> BatchSchema.newBuilder().setSelectionVectorMode(SelectionVectorMode.TWO_BYTE);
> +    for(ValueVector v : outputVectors){
> +      bldr.addField(v.getField());
> +    }
> +    this.outSchema = bldr.build();
> +
> +    try {
> +      TransferPair[] tx = transfers.toArray(new
> TransferPair[transfers.size()]);
> +      Filterer filterer = context.getImplementationClass(cg);
> +      filterer.setup(context, incoming, this, tx);
> +      return filterer;
> +    } catch (ClassTransformationException | IOException e) {
> +      throw new SchemaChangeException("Failure while attempting to load
> generated class", e);
> +    }
> +  }
> +
> +  @Override
> +  public WritableBatch getWritableBatch() {
> +    return WritableBatch.get(sv.getCount(), outputVectors);
> +  }
> +
> +
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
> index 216bfec..4092911 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
> @@ -1,18 +1,27 @@
>  package org.apache.drill.exec.physical.impl.filter;
>
> +import org.apache.drill.exec.exception.SchemaChangeException;
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
>  import org.apache.drill.exec.record.RecordBatch;
> +import org.apache.drill.exec.record.TransferPair;
>  import org.apache.drill.exec.record.selection.SelectionVector2;
>
> -public abstract class FilterTemplate {
> +public abstract class FilterTemplate implements Filterer{
>    static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(FilterTemplate.class);
>
> -  SelectionVector2 outgoingSelectionVector;
> -  SelectionVector2 incomingSelectionVector;
> +  private SelectionVector2 outgoingSelectionVector;
> +  private SelectionVector2 incomingSelectionVector;
> +  private SelectionVectorMode svMode;
> +  private TransferPair[] transfers;
>
> -  public void setup(RecordBatch incoming, RecordBatch outgoing){
> -    outgoingSelectionVector = outgoing.getSelectionVector2();
> -
> -    switch(incoming.getSchema().getSelectionVector()){
> +  @Override
> +  public void setup(FragmentContext context, RecordBatch incoming,
> RecordBatch outgoing, TransferPair[] transfers) throws
> SchemaChangeException{
> +    this.transfers = transfers;
> +    this.outgoingSelectionVector = outgoing.getSelectionVector2();
> +    this.svMode = incoming.getSchema().getSelectionVector();
> +
> +    switch(svMode){
>      case NONE:
>        break;
>      case TWO_BYTE:
> @@ -21,28 +30,54 @@ public abstract class FilterTemplate {
>      default:
>        throw new UnsupportedOperationException();
>      }
> +    doSetup(context, incoming, outgoing);
> +  }
> +
> +  private void doTransfers(){
> +    for(TransferPair t : transfers){
> +      t.transfer();
> +    }
>    }
>
> -  public void filterBatchSV2(int recordCount){
> +  public void filterBatch(int recordCount){
> +    doTransfers();
> +    switch(svMode){
> +    case NONE:
> +      filterBatchNoSV(recordCount);
> +      break;
> +    case TWO_BYTE:
> +      filterBatchSV2(recordCount);
> +      break;
> +    default:
> +      throw new UnsupportedOperationException();
> +    }
> +  }
> +
> +  private void filterBatchSV2(int recordCount){
>      int svIndex = 0;
> -    for(char i =0; i < recordCount; i++){
> -      if(include(i)){
> -        outgoingSelectionVector.setIndex(svIndex, i);
> +    final int count = recordCount*2;
> +    for(int i = 0; i < count; i+=2){
> +      char index = incomingSelectionVector.getIndex(i);
> +      if(doEval(i, 0)){
> +        outgoingSelectionVector.setIndex(svIndex, index);
>          svIndex+=2;
>        }
>      }
> +    outgoingSelectionVector.setRecordCount(svIndex/2);
>    }
>
> -  public void filterBatchNoSV(int recordCount){
> +  private void filterBatchNoSV(int recordCount){
>      int svIndex = 0;
>      for(char i =0; i < recordCount; i++){
>
> -      if(include(i)){
> +      if(doEval(i, 0)){
>          outgoingSelectionVector.setIndex(svIndex, i);
>          svIndex+=2;
>        }
>      }
> +    outgoingSelectionVector.setRecordCount(svIndex/2);
>    }
>
> -  protected abstract boolean include(int index);
> +  protected abstract void doSetup(FragmentContext context, RecordBatch
> incoming, RecordBatch outgoing) throws SchemaChangeException;
> +  protected abstract boolean doEval(int inIndex, int outIndex);
>  }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
> new file mode 100644
> index 0000000..b270869
> --- /dev/null
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
> @@ -0,0 +1,19 @@
> +package org.apache.drill.exec.physical.impl.filter;
> +
> +import org.apache.drill.exec.compile.TemplateClassDefinition;
> +import org.apache.drill.exec.exception.SchemaChangeException;
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.physical.impl.project.Projector;
> +import org.apache.drill.exec.record.RecordBatch;
> +import org.apache.drill.exec.record.TransferPair;
> +
> +public interface Filterer {
> +  static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(Filterer.class);
> +
> +  public void setup(FragmentContext context, RecordBatch incoming,
> RecordBatch outgoing, TransferPair[] transfers) throws
> SchemaChangeException;
> +  public void filterBatch(int recordCount);
> +
> +  public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION =
> new TemplateClassDefinition<Filterer>( //
> +      Filterer.class,
> "org.apache.drill.exec.physical.impl.filter.FilterTemplate",
> FilterEvaluator.class, boolean.class);
> +
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java
> new file mode 100644
> index 0000000..a794d63
> --- /dev/null
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java
> @@ -0,0 +1,39 @@
> +package org.apache.drill.exec.physical.impl.filter;
> +
> +import org.apache.drill.common.expression.ExpressionPosition;
> +import org.apache.drill.common.expression.LogicalExpression;
> +import org.apache.drill.common.expression.visitors.ExprVisitor;
> +import org.apache.drill.common.types.Types;
> +import org.apache.drill.common.types.TypeProtos.MajorType;
> +
> +public class ReturnValueExpression implements LogicalExpression{
> +  static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(ReturnValueExpression.class);
> +
> +  private LogicalExpression child;
> +
> +  public ReturnValueExpression(LogicalExpression child) {
> +    this.child = child;
> +  }
> +
> +  public LogicalExpression getChild() {
> +    return child;
> +  }
> +
> +  @Override
> +  public MajorType getMajorType() {
> +    return Types.NULL;
> +  }
> +
> +  @Override
> +  public <T, V, E extends Exception> T accept(ExprVisitor<T, V, E>
> visitor, V value) throws E {
> +    return visitor.visitUnknown(this, value);
> +  }
> +
> +  @Override
> +  public ExpressionPosition getPosition() {
> +    return ExpressionPosition.UNKNOWN;
> +  }
> +
> +
> +
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/SelectionVectorPopulationExpression.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/SelectionVectorPopulationExpression.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/SelectionVectorPopulationExpression.java
> deleted file mode 100644
> index f253695..0000000
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/SelectionVectorPopulationExpression.java
> +++ /dev/null
> @@ -1,39 +0,0 @@
> -package org.apache.drill.exec.physical.impl.filter;
> -
> -import org.apache.drill.common.expression.ExpressionPosition;
> -import org.apache.drill.common.expression.LogicalExpression;
> -import org.apache.drill.common.expression.visitors.ExprVisitor;
> -import org.apache.drill.common.types.Types;
> -import org.apache.drill.common.types.TypeProtos.MajorType;
> -
> -public class SelectionVectorPopulationExpression implements
> LogicalExpression{
> -  static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(SelectionVectorPopulationExpression.class);
> -
> -  private LogicalExpression child;
> -
> -  public SelectionVectorPopulationExpression(LogicalExpression child) {
> -    this.child = child;
> -  }
> -
> -  public LogicalExpression getChild() {
> -    return child;
> -  }
> -
> -  @Override
> -  public MajorType getMajorType() {
> -    return Types.NULL;
> -  }
> -
> -  @Override
> -  public <T, V, E extends Exception> T accept(ExprVisitor<T, V, E>
> visitor, V value) throws E {
> -    return visitor.visitUnknown(this, value);
> -  }
> -
> -  @Override
> -  public ExpressionPosition getPosition() {
> -    return ExpressionPosition.UNKNOWN;
> -  }
> -
> -
> -
> -}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
> index 86caf28..5fd1fb4 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
> @@ -7,6 +7,6 @@ import org.apache.drill.exec.record.RecordBatch;
>  public interface ProjectEvaluator {
>    static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(ProjectEvaluator.class);
>
> -  public abstract void setupEvaluators(FragmentContext context,
> RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
> -  public abstract void doPerRecordWork(int inIndex, int outIndex);
> +  public abstract void doSetup(FragmentContext context, RecordBatch
> incoming, RecordBatch outgoing) throws SchemaChangeException;
> +  public abstract void doEval(int inIndex, int outIndex);
>  }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
> index 3d1e3f7..060cd92 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
> @@ -1,6 +1,7 @@
>  package org.apache.drill.exec.physical.impl.project;
>
>  import java.io.IOException;
> +import java.util.Iterator;
>  import java.util.List;
>
>  import org.apache.drill.common.expression.ErrorCollector;
> @@ -59,7 +60,11 @@ public class ProjectRecordBatch implements RecordBatch{
>      this.context = context;
>    }
>
> -
> +  @Override
> +  public Iterator<ValueVector> iterator() {
> +    return outputVectors.iterator();
> +  }
> +
>    @Override
>    public FragmentContext getContext() {
>      return context;
> @@ -180,7 +185,7 @@ public class ProjectRecordBatch implements RecordBatch{
>          allocationVectors.add(vector);
>          outputVectors.add(vector);
>          ValueVectorWriteExpression write = new
> ValueVectorWriteExpression(outputVectors.size() - 1, expr);
> -        cg.addNextWrite(write);
> +        cg.addExpr(write);
>        }
>
>      }
> @@ -192,7 +197,7 @@ public class ProjectRecordBatch implements RecordBatch{
>      this.outSchema = bldr.build();
>
>      try {
> -      Projector projector =
> context.getImplementationClass(Projector.TEMPLATE_DEFINITION, cg);
> +      Projector projector = context.getImplementationClass(cg);
>        projector.setup(context, incoming, this, transfers);
>        return projector;
>      } catch (ClassTransformationException | IOException e) {
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
> index 2787f0c..0d1e201 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
> @@ -16,6 +16,6 @@ public interface Projector {
>    public abstract int projectRecords(int recordCount, int
> firstOutputIndex);
>
>    public static TemplateClassDefinition<Projector> TEMPLATE_DEFINITION =
> new TemplateClassDefinition<Projector>( //
> -      Projector.class,
> "org.apache.drill.exec.physical.impl.project.ProjectorTemplate",
> ProjectEvaluator.class, "setupEvaluators", "doPerRecordWork");
> +      Projector.class,
> "org.apache.drill.exec.physical.impl.project.ProjectorTemplate",
> ProjectEvaluator.class, null);
>
>  }
> \ No newline at end of file
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
> index 486c7b0..735d355 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
> @@ -33,7 +33,7 @@ public abstract class ProjectorTemplate implements
> Projector {
>      case TWO_BYTE:
>        final int count = recordCount*2;
>        for(int i = 0; i < count; i+=2, firstOutputIndex++){
> -        doPerRecordWork(vector2.getIndex(i), firstOutputIndex);
> +        doEval(vector2.getIndex(i), firstOutputIndex);
>        }
>        return recordCount;
>
> @@ -45,7 +45,7 @@ public abstract class ProjectorTemplate implements
> Projector {
>        }
>        final int countN = recordCount;
>        for (int i = 0; i < countN; i++, firstOutputIndex++) {
> -        doPerRecordWork(i, firstOutputIndex);
> +        doEval(i, firstOutputIndex);
>        }
>        return recordCount;
>
> @@ -68,11 +68,11 @@ public abstract class ProjectorTemplate implements
> Projector {
>        break;
>      }
>      this.transfers = ImmutableList.copyOf(transfers);
> -    setupEvaluators(context, incoming, outgoing);
> +    setupEval(context, incoming, outgoing);
>    }
>
> -  protected abstract void setupEvaluators(FragmentContext context,
> RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
> -  protected abstract void doPerRecordWork(int inIndex, int outIndex);
> +  protected abstract void setupEval(FragmentContext context, RecordBatch
> incoming, RecordBatch outgoing) throws SchemaChangeException;
> +  protected abstract void doEval(int inIndex, int outIndex);
>
>
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
> index 650a148..ff856d4 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
> @@ -29,7 +29,7 @@ import org.apache.drill.exec.vector.ValueVector;
>   * composed of ValueVectors, ideally a batch fits within L2 cache (~256k
> per core). The set of value vectors do not
>   * change unless the next() IterOutcome is a *_NEW_SCHEMA type.
>   */
> -public interface RecordBatch {
> +public interface RecordBatch extends Iterable<ValueVector>{
>
>    /**
>     * Describes the outcome of a RecordBatch being incremented forward.
> @@ -88,6 +88,7 @@ public interface RecordBatch {
>    public abstract TypedFieldId getValueVectorId(SchemaPath path);
>
>
> +
>    public abstract <T extends ValueVector> T getValueVectorById(int
> fieldId, Class<?> clazz);
>
>    /**
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
> index e2a1648..5f7648b 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
> @@ -33,9 +33,9 @@ import
> org.apache.drill.exec.record.RecordBatch.TypedFieldId;
>  import org.apache.drill.exec.vector.TypeHelper;
>  import org.apache.drill.exec.vector.ValueVector;
>
> -import com.beust.jcommander.internal.Lists;
> -import com.beust.jcommander.internal.Maps;
>  import com.google.common.collect.ImmutableList;
> +import com.google.common.collect.Lists;
> +import com.google.common.collect.Maps;
>
>  public class RecordBatchLoader implements Iterable<ValueVector>{
>    static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(RecordBatchLoader.class);
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
> index 0989c1d..34e4043 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
> @@ -23,8 +23,8 @@ import java.util.Set;
>  import org.apache.drill.exec.exception.SchemaChangeException;
>  import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
>
> -import com.beust.jcommander.internal.Sets;
>  import com.google.common.collect.Lists;
> +import com.google.common.collect.Sets;
>
>  /**
>   * A reusable builder that supports the creation of BatchSchemas. Can
> have a supporting expected object. If the expected Schema object is
> defined, the
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
> index cdc136e..88f0c79 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
> @@ -19,16 +19,20 @@ package org.apache.drill.exec.record.selection;
>
>  import io.netty.buffer.ByteBuf;
>
> +import java.io.Closeable;
> +import java.io.IOException;
> +
>  import org.apache.drill.exec.memory.BufferAllocator;
>  import org.apache.drill.exec.record.DeadBuf;
>
>  /**
>   * A selection vector that fronts, at most, a
>   */
> -public class SelectionVector2{
> +public class SelectionVector2 implements Closeable{
>    static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(SelectionVector2.class);
>
>    private final BufferAllocator allocator;
> +  private int recordCount;
>    private ByteBuf buffer = DeadBuf.DEAD_BUFFER;
>
>    public SelectionVector2(BufferAllocator allocator) {
> @@ -36,14 +40,40 @@ public class SelectionVector2{
>    }
>
>    public int getCount(){
> -    return -1;
> +    return recordCount;
>    }
>
> -  public int getIndex(int directIndex){
> +  public char getIndex(int directIndex){
>      return buffer.getChar(directIndex);
>    }
>
>    public void setIndex(int directIndex, char value){
>      buffer.setChar(directIndex, value);
>    }
> +
> +  public void allocateNew(int size){
> +    clear();
> +    buffer = allocator.buffer(size * 2);
> +  }
> +
> +
> +  public void clear() {
> +    if (buffer != DeadBuf.DEAD_BUFFER) {
> +      buffer.release();
> +      buffer = DeadBuf.DEAD_BUFFER;
> +      recordCount = 0;
> +    }
> +  }
> +
> +  public void setRecordCount(int recordCount){
> +    logger.debug("Seting record count to {}", recordCount);
> +    this.recordCount = recordCount;
> +  }
> +
> +  @Override
> +  public void close() throws IOException {
> +    clear();
> +  }
> +
> +
>  }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
> index 8513dfe..3a57410 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
> @@ -39,13 +39,14 @@ import
> org.apache.drill.exec.vector.NullableVarChar4Vector;
>  import org.apache.drill.exec.vector.TypeHelper;
>  import org.apache.drill.exec.vector.ValueVector;
>
> -import com.beust.jcommander.internal.Maps;
> +
>  import com.fasterxml.jackson.core.JsonFactory;
>  import com.fasterxml.jackson.core.JsonParser;
>  import com.fasterxml.jackson.core.JsonToken;
>  import com.google.common.base.Charsets;
>  import com.google.common.collect.Iterables;
>  import com.google.common.collect.Lists;
> +import com.google.common.collect.Maps;
>  import com.google.common.io.Files;
>  import com.google.common.io.InputSupplier;
>  import com.google.common.io.Resources;
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
> index cb1e1d6..d2889ed 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
> +++
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
> @@ -40,7 +40,7 @@ public class TestClassTransformation {
>
>      TemplateClassDefinition<ExampleExternalInterface> def = new
> TemplateClassDefinition<ExampleExternalInterface>(
>          ExampleExternalInterface.class,
> "org.apache.drill.exec.compile.ExampleTemplate",
> -        ExampleInternalInterface.class, "a", "b");
> +        ExampleInternalInterface.class, null);
>
>
>      ClassTransformer ct = new ClassTransformer();
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
> index 623af0e..c610374 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
> +++
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
> @@ -92,7 +92,7 @@ public class ExpressionTest {
>      }
>
>      CodeGenerator<Projector> cg = new
> CodeGenerator<Projector>(Projector.TEMPLATE_DEFINITION, new
> FunctionImplementationRegistry(DrillConfig.create()));
> -    cg.addNextWrite(new ValueVectorWriteExpression(-1, materializedExpr));
> +    cg.addExpr(new ValueVectorWriteExpression(-1, materializedExpr));
>      return cg.generate();
>    }
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
> index d125ec0..c6434f7 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
> +++
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
> @@ -5,6 +5,7 @@ import
> org.apache.drill.exec.physical.impl.ScreenCreator.ScreenRoot;
>  import org.apache.drill.exec.record.RecordBatch;
>  import org.apache.drill.exec.record.RecordBatch.IterOutcome;
>  import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
> +import org.apache.drill.exec.record.selection.SelectionVector2;
>  import org.apache.drill.exec.vector.ValueVector;
>
>  public class SimpleRootExec implements RootExec{
> @@ -21,6 +22,9 @@ public class SimpleRootExec implements RootExec{
>
>    }
>
> +  public SelectionVector2 getSelectionVector2(){
> +    return incoming.getSelectionVector2();
> +  }
>
>    public <T extends ValueVector> T getValueVectorById(SchemaPath path,
> Class<?> vvClass){
>      TypedFieldId tfid = incoming.getValueVectorId(path);
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
> new file mode 100644
> index 0000000..df11aa7
> --- /dev/null
> +++
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
> @@ -0,0 +1,58 @@
> +package org.apache.drill.exec.physical.impl.filter;
> +
> +import mockit.Injectable;
> +import mockit.NonStrictExpectations;
> +
> +import org.apache.drill.common.config.DrillConfig;
> +import org.apache.drill.common.util.FileUtils;
> +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
> +import org.apache.drill.exec.memory.BufferAllocator;
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.physical.PhysicalPlan;
> +import org.apache.drill.exec.physical.base.FragmentRoot;
> +import org.apache.drill.exec.physical.impl.ImplCreator;
> +import org.apache.drill.exec.physical.impl.SimpleRootExec;
> +import org.apache.drill.exec.planner.PhysicalPlanReader;
> +import org.apache.drill.exec.proto.CoordinationProtos;
> +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
> +import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
> +import org.apache.drill.exec.server.DrillbitContext;
> +import org.junit.After;
> +import org.junit.Test;
> +
> +import com.google.common.base.Charsets;
> +import com.google.common.io.Files;
> +import com.yammer.metrics.MetricRegistry;
> +
> +public class TestSimpleFilter {
> +  static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(TestSimpleFilter.class);
> +  DrillConfig c = DrillConfig.create();
> +
> +
> +  @Test
> +  public void project(@Injectable final DrillbitContext bitContext,
> @Injectable UserClientConnection connection) throws Exception{
> +    System.out.println(System.getProperty("java.class.path"));
> +
> +
> +    new NonStrictExpectations(){{
> +      bitContext.getMetrics(); result = new MetricRegistry("test");
> +      bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
> +    }};
> +
> +
> +    PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(),
> CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
> +    PhysicalPlan plan =
> reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/filter/test1.json"),
> Charsets.UTF_8));
> +    FunctionImplementationRegistry registry = new
> FunctionImplementationRegistry(c);
> +    FragmentContext context = new FragmentContext(bitContext,
> FragmentHandle.getDefaultInstance(), connection, null, registry);
> +    SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context,
> (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
> +    while(exec.next()){
> +      System.out.println(exec.getSelectionVector2().getCount());
> +    }
> +  }
> +
> +  @After
> +  public void tearDown() throws Exception{
> +    // pause to get logger to catch up.
> +    Thread.sleep(1000);
> +  }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
> index 925faf7..68b8881 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
> +++
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
> @@ -31,7 +31,7 @@ import
> org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
>  import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
>  import org.junit.Test;
>
> -import com.beust.jcommander.internal.Lists;
> +import com.google.common.collect.Lists;
>  import com.google.common.collect.Range;
>
>  public class ExpressionTreeMaterializerTest {
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
> index 7c9e8f4..4a0358e 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
> +++
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
> @@ -24,7 +24,8 @@ import org.apache.drill.exec.vector.ValueVector;
>  import org.junit.Ignore;
>  import org.junit.Test;
>
> -import com.beust.jcommander.internal.Lists;
> +import com.google.common.collect.Lists;
> +
>
>  public class JSONRecordReaderTest {
>    private static final Charset UTF_8 = Charset.forName("UTF-8");
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json
> b/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json
> new file mode 100644
> index 0000000..a892c70
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json
> @@ -0,0 +1,34 @@
> +{
> +    head:{
> +        type:"APACHE_DRILL_PHYSICAL",
> +        version:"1",
> +        generator:{
> +            type:"manual"
> +        }
> +    },
> +       graph:[
> +        {
> +            @id:1,
> +            pop:"mock-scan",
> +            url: "http://apache.org",
> +            entries:[
> +               {records: 100, types: [
> +                 {name: "blue", type: "INT", mode: "REQUIRED"},
> +                 {name: "red", type: "BIGINT", mode: "REQUIRED"},
> +                 {name: "green", type: "INT", mode: "REQUIRED"}
> +               ]}
> +            ]
> +        },
> +        {
> +            @id:2,
> +            child: 1,
> +            pop:"filter",
> +            expr: "true"
> +        },
> +        {
> +            @id: 3,
> +            child: 2,
> +            pop: "screen"
> +        }
> +    ]
> +}
> \ No newline at end of file
>
>