You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:58:12 UTC
[51/53] [abbrv] git commit: Initial working filter operator
Initial working filter operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/65e2cfce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/65e2cfce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/65e2cfce
Branch: refs/heads/master
Commit: 65e2cfce5364ccfe30b83b9dddeb304b79efbc76
Parents: 57de7ed
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed Jul 17 00:27:31 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Fri Jul 19 14:53:31 2013 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/cache/HazelCache.java | 1 -
.../drill/exec/compile/ClassTransformer.java | 17 +-
.../drill/exec/compile/JDKClassCompiler.java | 5 +-
.../exec/compile/TemplateClassDefinition.java | 24 +--
.../apache/drill/exec/expr/CodeGenerator.java | 20 +-
.../drill/exec/expr/EvaluationVisitor.java | 22 +-
.../drill/exec/expr/fn/FunctionConverter.java | 2 +-
.../exec/expr/fn/MethodGrabbingVisitor.java | 3 +-
.../apache/drill/exec/ops/FragmentContext.java | 9 +-
.../drill/exec/physical/impl/ImplCreator.java | 10 +-
.../drill/exec/physical/impl/ScanBatch.java | 11 +-
.../exec/physical/impl/WireRecordBatch.java | 8 +-
.../physical/impl/filter/ExampleFilter.java | 111 ----------
.../impl/filter/FilterBatchCreator.java | 23 +++
.../physical/impl/filter/FilterEvaluator.java | 10 +
.../physical/impl/filter/FilterRecordBatch.java | 200 +++++++++++++++++++
.../physical/impl/filter/FilterTemplate.java | 63 ++++--
.../exec/physical/impl/filter/Filterer.java | 19 ++
.../impl/filter/ReturnValueExpression.java | 39 ++++
.../SelectionVectorPopulationExpression.java | 39 ----
.../physical/impl/project/ProjectEvaluator.java | 4 +-
.../impl/project/ProjectRecordBatch.java | 11 +-
.../exec/physical/impl/project/Projector.java | 2 +-
.../impl/project/ProjectorTemplate.java | 10 +-
.../apache/drill/exec/record/RecordBatch.java | 3 +-
.../drill/exec/record/RecordBatchLoader.java | 4 +-
.../apache/drill/exec/record/SchemaBuilder.java | 2 +-
.../exec/record/selection/SelectionVector2.java | 36 +++-
.../drill/exec/store/JSONRecordReader.java | 3 +-
.../exec/compile/TestClassTransformation.java | 2 +-
.../apache/drill/exec/expr/ExpressionTest.java | 2 +-
.../exec/physical/impl/SimpleRootExec.java | 4 +
.../physical/impl/filter/TestSimpleFilter.java | 58 ++++++
.../record/ExpressionTreeMaterializerTest.java | 2 +-
.../drill/exec/store/JSONRecordReaderTest.java | 3 +-
.../src/test/resources/filter/test1.json | 34 ++++
36 files changed, 566 insertions(+), 250 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
index f4fdbfa..fe4a212 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
@@ -29,7 +29,6 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
-import com.beust.jcommander.internal.Lists;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.protobuf.InvalidProtocolBufferException;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
index 4bf6e7e..d7cde2a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
@@ -44,11 +44,11 @@ import org.objectweb.asm.tree.ClassNode;
import org.objectweb.asm.tree.FieldNode;
import org.objectweb.asm.tree.MethodNode;
-import com.beust.jcommander.internal.Sets;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.common.io.Resources;
@@ -112,15 +112,16 @@ public class ClassTransformer {
String materializedClassName) throws ClassTransformationException {
try {
-
+ long t0 = System.nanoTime();
final byte[] implementationClass = classLoader.getClassByteCode(materializedClassName, entireClass);
// Get Template Class
final String templateClassName = templateDefinition.getTemplateClassName().replaceAll("\\.", File.separator);
final String templateClassPath = File.separator + templateClassName + ".class";
+ long t1 = System.nanoTime();
final byte[] templateClass = getClassByteCodeFromPath(templateClassPath);
- int fileNum = new Random().nextInt(100);
- Files.write(templateClass, new File(String.format("/tmp/%d-template.class", fileNum)));
+// int fileNum = new Random().nextInt(100);
+ //Files.write(templateClass, new File(String.format("/tmp/%d-template.class", fileNum)));
// Generate Merge Class
// Setup adapters for merging, remapping class names and class writing. This is done in reverse order of how they
@@ -130,7 +131,11 @@ public class ClassTransformer {
RemapClasses remapper = new RemapClasses(oldTemplateSlashName, materializedSlashName);
{
+
ClassNode impl = getClassNodeFromByteCode(implementationClass);
+ long t2 = System.nanoTime();
+ logger.debug("Compile {}, decode template {}", (t1 - t0)/1000/1000, (t2- t1)/1000/1000);
+
ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES);
ClassVisitor remappingAdapter = new RemappingClassAdapter(cw, remapper);
@@ -139,7 +144,7 @@ public class ClassTransformer {
ClassReader tReader = new ClassReader(templateClass);
tReader.accept(mergingAdapter, ClassReader.EXPAND_FRAMES);
byte[] outputClass = cw.toByteArray();
- Files.write(outputClass, new File(String.format("/tmp/%d-output.class", fileNum)));
+// Files.write(outputClass, new File(String.format("/tmp/%d-output.class", fileNum)));
outputClass = cw.toByteArray();
// Load the class
@@ -160,7 +165,7 @@ public class ClassTransformer {
reader.accept(remap, ClassReader.EXPAND_FRAMES);
byte[] newByteCode = subcw.toByteArray();
classLoader.injectByteCode(s.replace(oldTemplateSlashName, materializedSlashName).replace('/', '.'), newByteCode);
- Files.write(subcw.toByteArray(), new File(String.format("/tmp/%d-sub-%d.class", fileNum, i)));
+// Files.write(subcw.toByteArray(), new File(String.format("/tmp/%d-sub-%d.class", fileNum, i)));
i++;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JDKClassCompiler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JDKClassCompiler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JDKClassCompiler.java
index 15e87fe..8f6e572 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JDKClassCompiler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JDKClassCompiler.java
@@ -30,9 +30,9 @@ import java.util.List;
import javax.tools.Diagnostic;
import javax.tools.DiagnosticListener;
import javax.tools.JavaCompiler;
+import javax.tools.JavaCompiler.CompilationTask;
import javax.tools.JavaFileManager;
import javax.tools.JavaFileObject;
-import javax.tools.JavaCompiler.CompilationTask;
import javax.tools.JavaFileObject.Kind;
import javax.tools.SimpleJavaFileObject;
import javax.tools.StandardLocation;
@@ -43,7 +43,8 @@ import org.codehaus.commons.compiler.Location;
import org.codehaus.commons.compiler.jdk.ByteArrayJavaFileManager;
import org.codehaus.commons.compiler.jdk.ByteArrayJavaFileManager.ByteArrayJavaFileObject;
-import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.Lists;
+
class JDKClassCompiler implements ClassCompiler {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JDKClassCompiler.class);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
index 5a01dce..20ef361 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
@@ -17,7 +17,6 @@
******************************************************************************/
package org.apache.drill.exec.compile;
-import java.lang.reflect.Method;
public class TemplateClassDefinition<T>{
@@ -25,24 +24,19 @@ public class TemplateClassDefinition<T>{
private final Class<T> externalInterface;
private final String templateClassName;
private final Class<?> internalInterface;
- private final String setupName;
- private final String evalName;
-
+ private final Class<?> evalReturnType;
- public TemplateClassDefinition(Class<T> externalInterface, String templateClassName, Class<?> internalInterface, String setupName, String evalName) {
+ public TemplateClassDefinition(Class<T> externalInterface, String templateClassName, Class<?> internalInterface, Class<?> evalReturnType) {
super();
this.externalInterface = externalInterface;
this.templateClassName = templateClassName;
this.internalInterface = internalInterface;
- this.setupName = setupName;
- this.evalName = evalName;
+ this.evalReturnType = evalReturnType;
}
-
public Class<T> getExternalInterface() {
return externalInterface;
}
-
public Class<?> getInternalInterface() {
return internalInterface;
@@ -52,16 +46,8 @@ public class TemplateClassDefinition<T>{
return templateClassName;
}
- public String getSetupName() {
- return setupName;
+ public Class<?> getEvalReturnType() {
+ return evalReturnType;
}
-
-
- public String getEvalName() {
- return evalName;
- }
-
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
index ed6bd9b..241c1cc 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
@@ -1,10 +1,13 @@
package org.apache.drill.exec.expr;
import java.io.IOException;
+import java.lang.reflect.Method;
+import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.expr.holders.BooleanHolder;
import org.apache.drill.exec.expr.holders.IntHolder;
@@ -36,8 +39,6 @@ public class CodeGenerator<T> {
private JBlock currentEvalBlock;
private JBlock currentSetupBlock;
private final EvaluationVisitor evaluationVisitor;
- private final String setupName;
- private final String perRecordName;
private final TemplateClassDefinition<T> definition;
private JCodeModel model;
private int index = 0;
@@ -46,8 +47,6 @@ public class CodeGenerator<T> {
super();
try{
this.definition = definition;
- this.setupName = definition.getSetupName();
- this.perRecordName = definition.getEvalName();
this.model = new JCodeModel();
this.clazz = model._package("org.apache.drill.exec.test.generated")._class("Test1");
clazz._implements(definition.getInternalInterface());
@@ -59,7 +58,7 @@ public class CodeGenerator<T> {
}
}
- public void addNextWrite(ValueVectorWriteExpression ex){
+ public void addExpr(LogicalExpression ex){
logger.debug("Adding next write {}", ex);
currentEvalBlock = new JBlock();
parentEvalBlock.add(currentEvalBlock);
@@ -80,20 +79,27 @@ public class CodeGenerator<T> {
return currentSetupBlock;
}
+
+ public TemplateClassDefinition<T> getDefinition() {
+ return definition;
+ }
+
public String generate() throws IOException{
{
//setup method
- JMethod m = clazz.method(JMod.PUBLIC, model.VOID, this.setupName);
+ JMethod m = clazz.method(JMod.PUBLIC, model.VOID, "doSetup");
m.param(model._ref(FragmentContext.class), "context");
m.param(model._ref(RecordBatch.class), "incoming");
m.param(model._ref(RecordBatch.class), "outgoing");
+ m._throws(SchemaChangeException.class);
m.body().add(parentSetupBlock);
}
{
// eval method.
- JMethod m = clazz.method(JMod.PUBLIC, model.VOID, this.perRecordName);
+ JType ret = definition.getEvalReturnType() == null ? model.VOID : model._ref(definition.getEvalReturnType());
+ JMethod m = clazz.method(JMod.PUBLIC, ret, "doEval");
m.param(model.INT, "inIndex");
m.param(model.INT, "outIndex");
m.body().add(parentEvalBlock);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index 6b0e499..c219d9c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -14,7 +14,7 @@ import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.CodeGenerator.HoldingContainer;
import org.apache.drill.exec.expr.fn.FunctionHolder;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
-import org.apache.drill.exec.physical.impl.filter.SelectionVectorPopulationExpression;
+import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.vector.TypeHelper;
@@ -117,7 +117,7 @@ public class EvaluationVisitor extends AbstractExprVisitor<HoldingContainer, Cod
@Override
public HoldingContainer visitBooleanConstant(BooleanExpression e, CodeGenerator<?> generator) throws RuntimeException {
HoldingContainer out = generator.declare(e.getMajorType());
- generator.getBlock().assign(out.getValue(), JExpr.lit(e.getBoolean()));
+ generator.getBlock().assign(out.getValue(), JExpr.lit(e.getBoolean() ? 1 : 0));
return out;
}
@@ -127,8 +127,8 @@ public class EvaluationVisitor extends AbstractExprVisitor<HoldingContainer, Cod
return visitValueVectorExpression((ValueVectorReadExpression) e, generator);
}else if(e instanceof ValueVectorWriteExpression){
return visitValueVectorWriteExpression((ValueVectorWriteExpression) e, generator);
- }else if(e instanceof SelectionVectorPopulationExpression){
- return visitSelectionVectorExpression((SelectionVectorPopulationExpression) e, generator);
+ }else if(e instanceof ReturnValueExpression){
+ return visitReturnValueExpression((ReturnValueExpression) e, generator);
}else{
return super.visitUnknown(e, generator);
}
@@ -196,21 +196,11 @@ public class EvaluationVisitor extends AbstractExprVisitor<HoldingContainer, Cod
}
- private HoldingContainer visitSelectionVectorExpression(SelectionVectorPopulationExpression e, CodeGenerator<?> generator){
- JType svClass = generator.getModel()._ref(SelectionVector2.class);
- JVar sv = generator.declareClassField("sv", svClass);
- JVar index = generator.declareClassField("svIndex", generator.getModel().CHAR);
+ private HoldingContainer visitReturnValueExpression(ReturnValueExpression e, CodeGenerator<?> generator){
LogicalExpression child = e.getChild();
Preconditions.checkArgument(child.getMajorType().equals(Types.REQUIRED_BOOLEAN));
HoldingContainer hc = child.accept(this, generator);
- generator.getBlock()._return(hc.getValue());
-
-// JBlock blk = generator.getSetupBlock();
-// blk.assign(sv, JExpr.direct("outgoing").invoke("getSelectionVector2"));
-// JConditional jc = blk._if(hc.getValue());
-// JBlock body = jc._then();
-// body.add(sv.invoke("set").arg(index).arg(JExpr.direct("inIndex")));
-// body.assign(index, index.plus(JExpr.lit(1)));
+ generator.getBlock()._return(hc.getValue().eq(JExpr.lit(1)));
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
index 84f04f0..8e0f1be 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
@@ -25,7 +25,7 @@ import org.codehaus.janino.Parser;
import org.codehaus.janino.Scanner;
import org.mortbay.util.IO;
-import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.Lists;
import com.google.common.io.InputSupplier;
import com.google.common.io.Resources;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/MethodGrabbingVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/MethodGrabbingVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/MethodGrabbingVisitor.java
index 22b9046..57268ee 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/MethodGrabbingVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/MethodGrabbingVisitor.java
@@ -8,7 +8,8 @@ import org.codehaus.janino.Java.ClassDeclaration;
import org.codehaus.janino.Java.MethodDeclarator;
import org.codehaus.janino.util.Traverser;
-import com.beust.jcommander.internal.Maps;
+import com.google.common.collect.Maps;
+
public class MethodGrabbingVisitor{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MethodGrabbingVisitor.class);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 876b873..2ae4afa 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -103,11 +103,14 @@ public class FragmentContext {
return context.getAllocator();
}
- public <T> T getImplementationClass(TemplateClassDefinition<T> templateDefinition, CodeGenerator<T> cg) throws ClassTransformationException, IOException{
- return transformer.getImplementationClass(this.loader, templateDefinition, cg.generate(), cg.getMaterializedClassName());
+ public <T> T getImplementationClass(CodeGenerator<T> cg) throws ClassTransformationException, IOException{
+ long t1 = System.nanoTime();
+ T t= transformer.getImplementationClass(this.loader, cg.getDefinition(), cg.generate(), cg.getMaterializedClassName());
+ System.out.println( (System.nanoTime() - t1)/1000/1000 );
+ return t;
+
}
-
public void addMetricsToStatus(FragmentStatus.Builder stats){
stats.setBatchesCompleted(batchesCompleted.get());
stats.setDataProcessed(dataProcessed.get());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 739c0d4..f96d6f3 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -26,12 +26,14 @@ import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Scan;
+import org.apache.drill.exec.physical.config.Filter;
import org.apache.drill.exec.physical.config.MockScanBatchCreator;
import org.apache.drill.exec.physical.config.MockScanPOP;
import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.physical.config.RandomReceiver;
import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.config.SingleSender;
+import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
import org.apache.drill.exec.record.RecordBatch;
@@ -46,6 +48,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
private RandomReceiverCreator rrc = new RandomReceiverCreator();
private SingleSenderCreator ssc = new SingleSenderCreator();
private ProjectBatchCreator pbc = new ProjectBatchCreator();
+ private FilterBatchCreator fbc = new FilterBatchCreator();
private RootExec root = null;
private ImplCreator(){}
@@ -78,10 +81,13 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
root = sc.getRoot(context, op, getChildren(op, context));
return null;
}
-
-
@Override
+ public RecordBatch visitFilter(Filter filter, FragmentContext context) throws ExecutionSetupException {
+ return fbc.getBatch(context, filter, getChildren(filter, context));
+ }
+
+ @Override
public RecordBatch visitSingleSender(SingleSender op, FragmentContext context) throws ExecutionSetupException {
root = ssc.getRoot(context, op, getChildren(op, context));
return null;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 5688bb1..084db54 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -26,7 +26,6 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.InvalidValueAccessor;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.SchemaBuilder;
@@ -36,9 +35,8 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.vector.ValueVector;
-import com.beust.jcommander.internal.Lists;
-import com.beust.jcommander.internal.Maps;
-import com.carrotsearch.hppc.procedures.IntObjectProcedure;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
/**
* Record batch used for a particular scan. Operators against one or more
@@ -171,6 +169,11 @@ public class ScanBatch implements RecordBatch {
}
@Override
+ public Iterator<ValueVector> iterator() {
+ return vectors.iterator();
+ }
+
+ @Override
public WritableBatch getWritableBatch() {
return WritableBatch.get(this.getRecordCount(), vectors);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index d2b8bfd..a575f69 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -17,6 +17,8 @@
******************************************************************************/
package org.apache.drill.exec.physical.impl;
+import java.util.Iterator;
+
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
@@ -66,9 +68,13 @@ public class WireRecordBatch implements RecordBatch{
public void kill() {
fragProvider.kill(context);
}
-
@Override
+ public Iterator<ValueVector> iterator() {
+ return batchLoader.iterator();
+ }
+
+ @Override
public SelectionVector2 getSelectionVector2() {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java
deleted file mode 100644
index 85f598f..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-
-package org.apache.drill.exec.physical.impl.filter;
-
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.ValueVector;
-
-public class ExampleFilter implements RecordBatch {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExampleFilter.class);
-
- //private EvalutationPredicates []
- private RecordBatch incoming;
- private BatchSchema outboundSchema;
- private int recordCount;
-
- private void reconfigureSchema() throws SchemaChangeException {
- BatchSchema in = incoming.getSchema();
- outboundSchema = BatchSchema.newBuilder().addFields(in).setSelectionVectorMode(BatchSchema.SelectionVectorMode.TWO_BYTE).build();
- }
-
- private int generateSelectionVector(){
- return -1;
- }
-
- @Override
- public FragmentContext getContext() {
- return incoming.getContext();
- }
-
- @Override
- public BatchSchema getSchema() {
- return outboundSchema;
- }
-
- @Override
- public int getRecordCount() {
- return recordCount; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public void kill() {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public SelectionVector2 getSelectionVector2() {
- return null;
- }
-
- @Override
- public SelectionVector4 getSelectionVector4() {
- return null;
- }
-
- @Override
- public TypedFieldId getValueVectorId(SchemaPath path) {
- return null;
- }
-
- @Override
- public <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> vvClass) {
- return null;
- }
-
- @Override
- public IterOutcome next() {
- IterOutcome out = incoming.next();
- switch (incoming.next()) {
-
- case NONE:
- return IterOutcome.NONE;
- case OK_NEW_SCHEMA:
- //reconfigureSchema();
- case OK:
- this.recordCount = generateSelectionVector();
- return out;
- case STOP:
- return IterOutcome.STOP;
- default:
- throw new UnsupportedOperationException();
- }
- }
-
- @Override
- public WritableBatch getWritableBatch() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
new file mode 100644
index 0000000..df2518b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
@@ -0,0 +1,23 @@
+package org.apache.drill.exec.physical.impl.filter;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.base.Preconditions;
+
+public class FilterBatchCreator implements BatchCreator<Filter>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterBatchCreator.class);
+
+ @Override
+ public RecordBatch getBatch(FragmentContext context, Filter config, List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.size() == 1);
+ return new FilterRecordBatch(config, children.iterator().next(), context);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java
new file mode 100644
index 0000000..0fad224
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java
@@ -0,0 +1,10 @@
+package org.apache.drill.exec.physical.impl.filter;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+
+public interface FilterEvaluator {
+ public void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+ public boolean doEval(int inIndex, int outIndex);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
new file mode 100644
index 0000000..fc9dbc6
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -0,0 +1,200 @@
+package org.apache.drill.exec.physical.impl.filter;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.ValueVectorReadExpression;
+import org.apache.drill.exec.expr.ValueVectorWriteExpression;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.impl.VectorHolder;
+import org.apache.drill.exec.physical.impl.project.Projector;
+import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
+import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
+import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.SchemaBuilder;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.NonRepeatedMutator;
+import org.apache.drill.exec.vector.TypeHelper;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class FilterRecordBatch implements RecordBatch{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
+
+ private final Filter filterConfig;
+ private final RecordBatch incoming;
+ private final FragmentContext context;
+ private final SelectionVector2 sv;
+ private BatchSchema outSchema;
+ private Filterer filter;
+ private List<ValueVector> outputVectors;
+ private VectorHolder vh;
+
+ public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context){
+ this.filterConfig = pop;
+ this.incoming = incoming;
+ this.context = context;
+ sv = new SelectionVector2(context.getAllocator());
+ }
+
+
+ @Override
+ public FragmentContext getContext() {
+ return context;
+ }
+
+ @Override
+ public BatchSchema getSchema() {
+ Preconditions.checkNotNull(outSchema);
+ return outSchema;
+ }
+
+ @Override
+ public int getRecordCount() {
+ return sv.getCount();
+ }
+
+ @Override
+ public void kill() {
+ incoming.kill();
+ }
+
+
+ @Override
+ public Iterator<ValueVector> iterator() {
+ return outputVectors.iterator();
+ }
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ return sv;
+ }
+
+ @Override
+ public SelectionVector4 getSelectionVector4() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TypedFieldId getValueVectorId(SchemaPath path) {
+ return vh.getValueVector(path);
+ }
+
+ @Override
+ public <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> clazz) {
+ return vh.getValueVector(fieldId, clazz);
+ }
+
+ @Override
+ public IterOutcome next() {
+
+ IterOutcome upstream = incoming.next();
+ logger.debug("Upstream... {}", upstream);
+ switch(upstream){
+ case NONE:
+ case NOT_YET:
+ case STOP:
+ return upstream;
+ case OK_NEW_SCHEMA:
+ try{
+ filter = createNewFilterer();
+ }catch(SchemaChangeException ex){
+ incoming.kill();
+ logger.error("Failure during query", ex);
+ context.fail(ex);
+ return IterOutcome.STOP;
+ }
+ // fall through.
+ case OK:
+ int recordCount = incoming.getRecordCount();
+ sv.allocateNew(recordCount);
+ filter.filterBatch(recordCount);
+ for(ValueVector v : this.outputVectors){
+ ValueVector.Mutator m = v.getMutator();
+ if(m instanceof NonRepeatedMutator){
+ ((NonRepeatedMutator) m).setValueCount(recordCount);
+ }else{
+ throw new UnsupportedOperationException();
+ }
+ }
+ return upstream; // change if upstream changed, otherwise normal.
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+
+ private Filterer createNewFilterer() throws SchemaChangeException{
+ if(outputVectors != null){
+ for(ValueVector v : outputVectors){
+ v.close();
+ }
+ }
+ this.outputVectors = Lists.newArrayList();
+ this.vh = new VectorHolder(outputVectors);
+ LogicalExpression filterExpression = filterConfig.getExpr();
+ final ErrorCollector collector = new ErrorCollectorImpl();
+ final List<TransferPair> transfers = Lists.newArrayList();
+ final CodeGenerator<Filterer> cg = new CodeGenerator<Filterer>(Filterer.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(filterExpression, incoming, collector);
+ if(collector.hasErrors()){
+ throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
+ }
+
+ cg.addExpr(new ReturnValueExpression(expr));
+
+ for(ValueVector v : incoming){
+ TransferPair pair = v.getTransferPair();
+ outputVectors.add(pair.getTo());
+ transfers.add(pair);
+ }
+
+ SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(SelectionVectorMode.TWO_BYTE);
+ for(ValueVector v : outputVectors){
+ bldr.addField(v.getField());
+ }
+ this.outSchema = bldr.build();
+
+ try {
+ TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]);
+ Filterer filterer = context.getImplementationClass(cg);
+ filterer.setup(context, incoming, this, tx);
+ return filterer;
+ } catch (ClassTransformationException | IOException e) {
+ throw new SchemaChangeException("Failure while attempting to load generated class", e);
+ }
+ }
+
+ @Override
+ public WritableBatch getWritableBatch() {
+ return WritableBatch.get(sv.getCount(), outputVectors);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
index 216bfec..4092911 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
@@ -1,18 +1,27 @@
package org.apache.drill.exec.physical.impl.filter;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.selection.SelectionVector2;
-public abstract class FilterTemplate {
+public abstract class FilterTemplate implements Filterer{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterTemplate.class);
- SelectionVector2 outgoingSelectionVector;
- SelectionVector2 incomingSelectionVector;
+ private SelectionVector2 outgoingSelectionVector;
+ private SelectionVector2 incomingSelectionVector;
+ private SelectionVectorMode svMode;
+ private TransferPair[] transfers;
- public void setup(RecordBatch incoming, RecordBatch outgoing){
- outgoingSelectionVector = outgoing.getSelectionVector2();
-
- switch(incoming.getSchema().getSelectionVector()){
+ @Override
+ public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException{
+ this.transfers = transfers;
+ this.outgoingSelectionVector = outgoing.getSelectionVector2();
+ this.svMode = incoming.getSchema().getSelectionVector();
+
+ switch(svMode){
case NONE:
break;
case TWO_BYTE:
@@ -21,28 +30,54 @@ public abstract class FilterTemplate {
default:
throw new UnsupportedOperationException();
}
+ doSetup(context, incoming, outgoing);
+ }
+
+ private void doTransfers(){
+ for(TransferPair t : transfers){
+ t.transfer();
+ }
}
- public void filterBatchSV2(int recordCount){
+ public void filterBatch(int recordCount){
+ doTransfers();
+ switch(svMode){
+ case NONE:
+ filterBatchNoSV(recordCount);
+ break;
+ case TWO_BYTE:
+ filterBatchSV2(recordCount);
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private void filterBatchSV2(int recordCount){
int svIndex = 0;
- for(char i =0; i < recordCount; i++){
- if(include(i)){
- outgoingSelectionVector.setIndex(svIndex, i);
+ final int count = recordCount*2;
+ for(int i = 0; i < count; i+=2){
+ char index = incomingSelectionVector.getIndex(i);
+ if(doEval(i, 0)){
+ outgoingSelectionVector.setIndex(svIndex, index);
svIndex+=2;
}
}
+ outgoingSelectionVector.setRecordCount(svIndex/2);
}
- public void filterBatchNoSV(int recordCount){
+ private void filterBatchNoSV(int recordCount){
int svIndex = 0;
for(char i =0; i < recordCount; i++){
- if(include(i)){
+ if(doEval(i, 0)){
outgoingSelectionVector.setIndex(svIndex, i);
svIndex+=2;
}
}
+ outgoingSelectionVector.setRecordCount(svIndex/2);
}
- protected abstract boolean include(int index);
+ protected abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+ protected abstract boolean doEval(int inIndex, int outIndex);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
new file mode 100644
index 0000000..b270869
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
@@ -0,0 +1,19 @@
+package org.apache.drill.exec.physical.impl.filter;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.project.Projector;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+
+public interface Filterer {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Filterer.class);
+
+ public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException;
+ public void filterBatch(int recordCount);
+
+ public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION = new TemplateClassDefinition<Filterer>( //
+ Filterer.class, "org.apache.drill.exec.physical.impl.filter.FilterTemplate", FilterEvaluator.class, boolean.class);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java
new file mode 100644
index 0000000..a794d63
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java
@@ -0,0 +1,39 @@
+package org.apache.drill.exec.physical.impl.filter;
+
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.visitors.ExprVisitor;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+
+public class ReturnValueExpression implements LogicalExpression{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReturnValueExpression.class);
+
+ private LogicalExpression child;
+
+ public ReturnValueExpression(LogicalExpression child) {
+ this.child = child;
+ }
+
+ public LogicalExpression getChild() {
+ return child;
+ }
+
+ @Override
+ public MajorType getMajorType() {
+ return Types.NULL;
+ }
+
+ @Override
+ public <T, V, E extends Exception> T accept(ExprVisitor<T, V, E> visitor, V value) throws E {
+ return visitor.visitUnknown(this, value);
+ }
+
+ @Override
+ public ExpressionPosition getPosition() {
+ return ExpressionPosition.UNKNOWN;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/SelectionVectorPopulationExpression.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/SelectionVectorPopulationExpression.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/SelectionVectorPopulationExpression.java
deleted file mode 100644
index f253695..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/SelectionVectorPopulationExpression.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package org.apache.drill.exec.physical.impl.filter;
-
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.visitors.ExprVisitor;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-
-public class SelectionVectorPopulationExpression implements LogicalExpression{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVectorPopulationExpression.class);
-
- private LogicalExpression child;
-
- public SelectionVectorPopulationExpression(LogicalExpression child) {
- this.child = child;
- }
-
- public LogicalExpression getChild() {
- return child;
- }
-
- @Override
- public MajorType getMajorType() {
- return Types.NULL;
- }
-
- @Override
- public <T, V, E extends Exception> T accept(ExprVisitor<T, V, E> visitor, V value) throws E {
- return visitor.visitUnknown(this, value);
- }
-
- @Override
- public ExpressionPosition getPosition() {
- return ExpressionPosition.UNKNOWN;
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
index 86caf28..5fd1fb4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
@@ -7,6 +7,6 @@ import org.apache.drill.exec.record.RecordBatch;
public interface ProjectEvaluator {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectEvaluator.class);
- public abstract void setupEvaluators(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
- public abstract void doPerRecordWork(int inIndex, int outIndex);
+ public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+ public abstract void doEval(int inIndex, int outIndex);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 3d1e3f7..060cd92 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -1,6 +1,7 @@
package org.apache.drill.exec.physical.impl.project;
import java.io.IOException;
+import java.util.Iterator;
import java.util.List;
import org.apache.drill.common.expression.ErrorCollector;
@@ -59,7 +60,11 @@ public class ProjectRecordBatch implements RecordBatch{
this.context = context;
}
-
+ @Override
+ public Iterator<ValueVector> iterator() {
+ return outputVectors.iterator();
+ }
+
@Override
public FragmentContext getContext() {
return context;
@@ -180,7 +185,7 @@ public class ProjectRecordBatch implements RecordBatch{
allocationVectors.add(vector);
outputVectors.add(vector);
ValueVectorWriteExpression write = new ValueVectorWriteExpression(outputVectors.size() - 1, expr);
- cg.addNextWrite(write);
+ cg.addExpr(write);
}
}
@@ -192,7 +197,7 @@ public class ProjectRecordBatch implements RecordBatch{
this.outSchema = bldr.build();
try {
- Projector projector = context.getImplementationClass(Projector.TEMPLATE_DEFINITION, cg);
+ Projector projector = context.getImplementationClass(cg);
projector.setup(context, incoming, this, transfers);
return projector;
} catch (ClassTransformationException | IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
index 2787f0c..0d1e201 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
@@ -16,6 +16,6 @@ public interface Projector {
public abstract int projectRecords(int recordCount, int firstOutputIndex);
public static TemplateClassDefinition<Projector> TEMPLATE_DEFINITION = new TemplateClassDefinition<Projector>( //
- Projector.class, "org.apache.drill.exec.physical.impl.project.ProjectorTemplate", ProjectEvaluator.class, "setupEvaluators", "doPerRecordWork");
+ Projector.class, "org.apache.drill.exec.physical.impl.project.ProjectorTemplate", ProjectEvaluator.class, null);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index 486c7b0..735d355 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -33,7 +33,7 @@ public abstract class ProjectorTemplate implements Projector {
case TWO_BYTE:
final int count = recordCount*2;
for(int i = 0; i < count; i+=2, firstOutputIndex++){
- doPerRecordWork(vector2.getIndex(i), firstOutputIndex);
+ doEval(vector2.getIndex(i), firstOutputIndex);
}
return recordCount;
@@ -45,7 +45,7 @@ public abstract class ProjectorTemplate implements Projector {
}
final int countN = recordCount;
for (int i = 0; i < countN; i++, firstOutputIndex++) {
- doPerRecordWork(i, firstOutputIndex);
+ doEval(i, firstOutputIndex);
}
return recordCount;
@@ -68,11 +68,11 @@ public abstract class ProjectorTemplate implements Projector {
break;
}
this.transfers = ImmutableList.copyOf(transfers);
- setupEvaluators(context, incoming, outgoing);
+ setupEval(context, incoming, outgoing);
}
- protected abstract void setupEvaluators(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
- protected abstract void doPerRecordWork(int inIndex, int outIndex);
+ protected abstract void setupEval(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+ protected abstract void doEval(int inIndex, int outIndex);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 650a148..ff856d4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -29,7 +29,7 @@ import org.apache.drill.exec.vector.ValueVector;
* composed of ValueVectors, ideally a batch fits within L2 cache (~256k per core). The set of value vectors do not
* change unless the next() IterOutcome is a *_NEW_SCHEMA type.
*/
-public interface RecordBatch {
+public interface RecordBatch extends Iterable<ValueVector>{
/**
* Describes the outcome of a RecordBatch being incremented forward.
@@ -88,6 +88,7 @@ public interface RecordBatch {
public abstract TypedFieldId getValueVectorId(SchemaPath path);
+
public abstract <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> clazz);
/**
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index e2a1648..5f7648b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -33,9 +33,9 @@ import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
import org.apache.drill.exec.vector.TypeHelper;
import org.apache.drill.exec.vector.ValueVector;
-import com.beust.jcommander.internal.Lists;
-import com.beust.jcommander.internal.Maps;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
public class RecordBatchLoader implements Iterable<ValueVector>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchLoader.class);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
index 0989c1d..34e4043 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
@@ -23,8 +23,8 @@ import java.util.Set;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import com.beust.jcommander.internal.Sets;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
/**
* A reusable builder that supports the creation of BatchSchemas. Can have a supporting expected object. If the expected Schema object is defined, the
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index cdc136e..88f0c79 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -19,16 +19,20 @@ package org.apache.drill.exec.record.selection;
import io.netty.buffer.ByteBuf;
+import java.io.Closeable;
+import java.io.IOException;
+
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.DeadBuf;
/**
* A selection vector that fronts, at most, a
*/
-public class SelectionVector2{
+public class SelectionVector2 implements Closeable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector2.class);
private final BufferAllocator allocator;
+ private int recordCount;
private ByteBuf buffer = DeadBuf.DEAD_BUFFER;
public SelectionVector2(BufferAllocator allocator) {
@@ -36,14 +40,40 @@ public class SelectionVector2{
}
public int getCount(){
- return -1;
+ return recordCount;
}
- public int getIndex(int directIndex){
+ public char getIndex(int directIndex){
return buffer.getChar(directIndex);
}
public void setIndex(int directIndex, char value){
buffer.setChar(directIndex, value);
}
+
+ public void allocateNew(int size){
+ clear();
+ buffer = allocator.buffer(size * 2);
+ }
+
+
+ public void clear() {
+ if (buffer != DeadBuf.DEAD_BUFFER) {
+ buffer.release();
+ buffer = DeadBuf.DEAD_BUFFER;
+ recordCount = 0;
+ }
+ }
+
+ public void setRecordCount(int recordCount){
+ logger.debug("Seting record count to {}", recordCount);
+ this.recordCount = recordCount;
+ }
+
+ @Override
+ public void close() throws IOException {
+ clear();
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
index 8513dfe..3a57410 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
@@ -39,13 +39,14 @@ import org.apache.drill.exec.vector.NullableVarChar4Vector;
import org.apache.drill.exec.vector.TypeHelper;
import org.apache.drill.exec.vector.ValueVector;
-import com.beust.jcommander.internal.Maps;
+
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.google.common.base.Charsets;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.common.io.InputSupplier;
import com.google.common.io.Resources;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
index cb1e1d6..d2889ed 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
@@ -40,7 +40,7 @@ public class TestClassTransformation {
TemplateClassDefinition<ExampleExternalInterface> def = new TemplateClassDefinition<ExampleExternalInterface>(
ExampleExternalInterface.class, "org.apache.drill.exec.compile.ExampleTemplate",
- ExampleInternalInterface.class, "a", "b");
+ ExampleInternalInterface.class, null);
ClassTransformer ct = new ClassTransformer();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
index 623af0e..c610374 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
@@ -92,7 +92,7 @@ public class ExpressionTest {
}
CodeGenerator<Projector> cg = new CodeGenerator<Projector>(Projector.TEMPLATE_DEFINITION, new FunctionImplementationRegistry(DrillConfig.create()));
- cg.addNextWrite(new ValueVectorWriteExpression(-1, materializedExpr));
+ cg.addExpr(new ValueVectorWriteExpression(-1, materializedExpr));
return cg.generate();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index d125ec0..c6434f7 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -5,6 +5,7 @@ import org.apache.drill.exec.physical.impl.ScreenCreator.ScreenRoot;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
+import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.vector.ValueVector;
public class SimpleRootExec implements RootExec{
@@ -21,6 +22,9 @@ public class SimpleRootExec implements RootExec{
}
+ public SelectionVector2 getSelectionVector2(){
+ return incoming.getSelectionVector2();
+ }
public <T extends ValueVector> T getValueVectorById(SchemaPath path, Class<?> vvClass){
TypedFieldId tfid = incoming.getValueVectorId(path);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
new file mode 100644
index 0000000..df11aa7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
@@ -0,0 +1,58 @@
+package org.apache.drill.exec.physical.impl.filter;
+
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.SimpleRootExec;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.junit.After;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.yammer.metrics.MetricRegistry;
+
+public class TestSimpleFilter {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSimpleFilter.class);
+ DrillConfig c = DrillConfig.create();
+
+
+ @Test
+ public void project(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Exception{
+ System.out.println(System.getProperty("java.class.path"));
+
+
+ new NonStrictExpectations(){{
+ bitContext.getMetrics(); result = new MetricRegistry("test");
+ bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ }};
+
+
+ PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+ PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/filter/test1.json"), Charsets.UTF_8));
+ FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+ while(exec.next()){
+ System.out.println(exec.getSelectionVector2().getCount());
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception{
+ // pause to get logger to catch up.
+ Thread.sleep(1000);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
index 925faf7..68b8881 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
@@ -31,7 +31,7 @@ import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
import org.junit.Test;
-import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.Lists;
import com.google.common.collect.Range;
public class ExpressionTreeMaterializerTest {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
index 7c9e8f4..4a0358e 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
@@ -24,7 +24,8 @@ import org.apache.drill.exec.vector.ValueVector;
import org.junit.Ignore;
import org.junit.Test;
-import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.Lists;
+
public class JSONRecordReaderTest {
private static final Charset UTF_8 = Charset.forName("UTF-8");
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json b/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json
new file mode 100644
index 0000000..a892c70
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json
@@ -0,0 +1,34 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"mock-scan",
+ url: "http://apache.org",
+ entries:[
+ {records: 100, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "red", type: "BIGINT", mode: "REQUIRED"},
+ {name: "green", type: "INT", mode: "REQUIRED"}
+ ]}
+ ]
+ },
+ {
+ @id:2,
+ child: 1,
+ pop:"filter",
+ expr: "true"
+ },
+ {
+ @id: 3,
+ child: 2,
+ pop: "screen"
+ }
+ ]
+}
\ No newline at end of file
Re: [51/53] [abbrv] git commit: Initial working filter operator
Posted by Ted Dunning <te...@gmail.com>.
On Fri, Jul 19, 2013 at 6:58 PM, <ja...@apache.org> wrote:
> Initial working filter operator
>
>
> Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
> Commit:
> http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/65e2cfce
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/65e2cfce
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/65e2cfce
>
> Branch: refs/heads/master
> Commit: 65e2cfce5364ccfe30b83b9dddeb304b79efbc76
> Parents: 57de7ed
> Author: Jacques Nadeau <ja...@apache.org>
> Authored: Wed Jul 17 00:27:31 2013 -0700
> Committer: Jacques Nadeau <ja...@apache.org>
> Committed: Fri Jul 19 14:53:31 2013 -0700
>
> ----------------------------------------------------------------------
> .../org/apache/drill/exec/cache/HazelCache.java | 1 -
> .../drill/exec/compile/ClassTransformer.java | 17 +-
> .../drill/exec/compile/JDKClassCompiler.java | 5 +-
> .../exec/compile/TemplateClassDefinition.java | 24 +--
> .../apache/drill/exec/expr/CodeGenerator.java | 20 +-
> .../drill/exec/expr/EvaluationVisitor.java | 22 +-
> .../drill/exec/expr/fn/FunctionConverter.java | 2 +-
> .../exec/expr/fn/MethodGrabbingVisitor.java | 3 +-
> .../apache/drill/exec/ops/FragmentContext.java | 9 +-
> .../drill/exec/physical/impl/ImplCreator.java | 10 +-
> .../drill/exec/physical/impl/ScanBatch.java | 11 +-
> .../exec/physical/impl/WireRecordBatch.java | 8 +-
> .../physical/impl/filter/ExampleFilter.java | 111 ----------
> .../impl/filter/FilterBatchCreator.java | 23 +++
> .../physical/impl/filter/FilterEvaluator.java | 10 +
> .../physical/impl/filter/FilterRecordBatch.java | 200 +++++++++++++++++++
> .../physical/impl/filter/FilterTemplate.java | 63 ++++--
> .../exec/physical/impl/filter/Filterer.java | 19 ++
> .../impl/filter/ReturnValueExpression.java | 39 ++++
> .../SelectionVectorPopulationExpression.java | 39 ----
> .../physical/impl/project/ProjectEvaluator.java | 4 +-
> .../impl/project/ProjectRecordBatch.java | 11 +-
> .../exec/physical/impl/project/Projector.java | 2 +-
> .../impl/project/ProjectorTemplate.java | 10 +-
> .../apache/drill/exec/record/RecordBatch.java | 3 +-
> .../drill/exec/record/RecordBatchLoader.java | 4 +-
> .../apache/drill/exec/record/SchemaBuilder.java | 2 +-
> .../exec/record/selection/SelectionVector2.java | 36 +++-
> .../drill/exec/store/JSONRecordReader.java | 3 +-
> .../exec/compile/TestClassTransformation.java | 2 +-
> .../apache/drill/exec/expr/ExpressionTest.java | 2 +-
> .../exec/physical/impl/SimpleRootExec.java | 4 +
> .../physical/impl/filter/TestSimpleFilter.java | 58 ++++++
> .../record/ExpressionTreeMaterializerTest.java | 2 +-
> .../drill/exec/store/JSONRecordReaderTest.java | 3 +-
> .../src/test/resources/filter/test1.json | 34 ++++
> 36 files changed, 566 insertions(+), 250 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
> index f4fdbfa..fe4a212 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
> @@ -29,7 +29,6 @@ import
> org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
> import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
> import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
>
> -import com.beust.jcommander.internal.Lists;
> import com.google.common.cache.Cache;
> import com.google.common.cache.CacheBuilder;
> import com.google.protobuf.InvalidProtocolBufferException;
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
> index 4bf6e7e..d7cde2a 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
> @@ -44,11 +44,11 @@ import org.objectweb.asm.tree.ClassNode;
> import org.objectweb.asm.tree.FieldNode;
> import org.objectweb.asm.tree.MethodNode;
>
> -import com.beust.jcommander.internal.Sets;
> import com.google.common.base.Preconditions;
> import com.google.common.cache.CacheBuilder;
> import com.google.common.cache.CacheLoader;
> import com.google.common.cache.LoadingCache;
> +import com.google.common.collect.Sets;
> import com.google.common.io.Files;
> import com.google.common.io.Resources;
>
> @@ -112,15 +112,16 @@ public class ClassTransformer {
> String materializedClassName) throws ClassTransformationException {
>
> try {
> -
> + long t0 = System.nanoTime();
> final byte[] implementationClass =
> classLoader.getClassByteCode(materializedClassName, entireClass);
>
> // Get Template Class
> final String templateClassName =
> templateDefinition.getTemplateClassName().replaceAll("\\.", File.separator);
> final String templateClassPath = File.separator + templateClassName
> + ".class";
> + long t1 = System.nanoTime();
> final byte[] templateClass =
> getClassByteCodeFromPath(templateClassPath);
> - int fileNum = new Random().nextInt(100);
> - Files.write(templateClass, new
> File(String.format("/tmp/%d-template.class", fileNum)));
> +// int fileNum = new Random().nextInt(100);
> + //Files.write(templateClass, new
> File(String.format("/tmp/%d-template.class", fileNum)));
> // Generate Merge Class
>
> // Setup adapters for merging, remapping class names and class
> writing. This is done in reverse order of how they
> @@ -130,7 +131,11 @@ public class ClassTransformer {
> RemapClasses remapper = new RemapClasses(oldTemplateSlashName,
> materializedSlashName);
>
> {
> +
> ClassNode impl = getClassNodeFromByteCode(implementationClass);
> + long t2 = System.nanoTime();
> + logger.debug("Compile {}, decode template {}", (t1 -
> t0)/1000/1000, (t2- t1)/1000/1000);
> +
> ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES);
>
> ClassVisitor remappingAdapter = new RemappingClassAdapter(cw,
> remapper);
> @@ -139,7 +144,7 @@ public class ClassTransformer {
> ClassReader tReader = new ClassReader(templateClass);
> tReader.accept(mergingAdapter, ClassReader.EXPAND_FRAMES);
> byte[] outputClass = cw.toByteArray();
> - Files.write(outputClass, new
> File(String.format("/tmp/%d-output.class", fileNum)));
> +// Files.write(outputClass, new
> File(String.format("/tmp/%d-output.class", fileNum)));
> outputClass = cw.toByteArray();
>
> // Load the class
> @@ -160,7 +165,7 @@ public class ClassTransformer {
> reader.accept(remap, ClassReader.EXPAND_FRAMES);
> byte[] newByteCode = subcw.toByteArray();
> classLoader.injectByteCode(s.replace(oldTemplateSlashName,
> materializedSlashName).replace('/', '.'), newByteCode);
> - Files.write(subcw.toByteArray(), new
> File(String.format("/tmp/%d-sub-%d.class", fileNum, i)));
> +// Files.write(subcw.toByteArray(), new
> File(String.format("/tmp/%d-sub-%d.class", fileNum, i)));
> i++;
> }
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JDKClassCompiler.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JDKClassCompiler.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JDKClassCompiler.java
> index 15e87fe..8f6e572 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JDKClassCompiler.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JDKClassCompiler.java
> @@ -30,9 +30,9 @@ import java.util.List;
> import javax.tools.Diagnostic;
> import javax.tools.DiagnosticListener;
> import javax.tools.JavaCompiler;
> +import javax.tools.JavaCompiler.CompilationTask;
> import javax.tools.JavaFileManager;
> import javax.tools.JavaFileObject;
> -import javax.tools.JavaCompiler.CompilationTask;
> import javax.tools.JavaFileObject.Kind;
> import javax.tools.SimpleJavaFileObject;
> import javax.tools.StandardLocation;
> @@ -43,7 +43,8 @@ import org.codehaus.commons.compiler.Location;
> import org.codehaus.commons.compiler.jdk.ByteArrayJavaFileManager;
> import
> org.codehaus.commons.compiler.jdk.ByteArrayJavaFileManager.ByteArrayJavaFileObject;
>
> -import com.beust.jcommander.internal.Lists;
> +import com.google.common.collect.Lists;
> +
>
> class JDKClassCompiler implements ClassCompiler {
> static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(JDKClassCompiler.class);
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
> index 5a01dce..20ef361 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
> @@ -17,7 +17,6 @@
>
> ******************************************************************************/
> package org.apache.drill.exec.compile;
>
> -import java.lang.reflect.Method;
>
>
> public class TemplateClassDefinition<T>{
> @@ -25,24 +24,19 @@ public class TemplateClassDefinition<T>{
> private final Class<T> externalInterface;
> private final String templateClassName;
> private final Class<?> internalInterface;
> - private final String setupName;
> - private final String evalName;
> -
> + private final Class<?> evalReturnType;
>
> - public TemplateClassDefinition(Class<T> externalInterface, String
> templateClassName, Class<?> internalInterface, String setupName, String
> evalName) {
> + public TemplateClassDefinition(Class<T> externalInterface, String
> templateClassName, Class<?> internalInterface, Class<?> evalReturnType) {
> super();
> this.externalInterface = externalInterface;
> this.templateClassName = templateClassName;
> this.internalInterface = internalInterface;
> - this.setupName = setupName;
> - this.evalName = evalName;
> + this.evalReturnType = evalReturnType;
> }
>
> -
> public Class<T> getExternalInterface() {
> return externalInterface;
> }
> -
>
> public Class<?> getInternalInterface() {
> return internalInterface;
> @@ -52,16 +46,8 @@ public class TemplateClassDefinition<T>{
> return templateClassName;
> }
>
> - public String getSetupName() {
> - return setupName;
> + public Class<?> getEvalReturnType() {
> + return evalReturnType;
> }
> -
> -
> - public String getEvalName() {
> - return evalName;
> - }
> -
> -
> -
>
> }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
> index ed6bd9b..241c1cc 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
> @@ -1,10 +1,13 @@
> package org.apache.drill.exec.expr;
>
> import java.io.IOException;
> +import java.lang.reflect.Method;
>
> +import org.apache.drill.common.expression.LogicalExpression;
> import org.apache.drill.common.types.TypeProtos.DataMode;
> import org.apache.drill.common.types.TypeProtos.MajorType;
> import org.apache.drill.exec.compile.TemplateClassDefinition;
> +import org.apache.drill.exec.exception.SchemaChangeException;
> import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
> import org.apache.drill.exec.expr.holders.BooleanHolder;
> import org.apache.drill.exec.expr.holders.IntHolder;
> @@ -36,8 +39,6 @@ public class CodeGenerator<T> {
> private JBlock currentEvalBlock;
> private JBlock currentSetupBlock;
> private final EvaluationVisitor evaluationVisitor;
> - private final String setupName;
> - private final String perRecordName;
> private final TemplateClassDefinition<T> definition;
> private JCodeModel model;
> private int index = 0;
> @@ -46,8 +47,6 @@ public class CodeGenerator<T> {
> super();
> try{
> this.definition = definition;
> - this.setupName = definition.getSetupName();
> - this.perRecordName = definition.getEvalName();
> this.model = new JCodeModel();
> this.clazz =
> model._package("org.apache.drill.exec.test.generated")._class("Test1");
> clazz._implements(definition.getInternalInterface());
> @@ -59,7 +58,7 @@ public class CodeGenerator<T> {
> }
> }
>
> - public void addNextWrite(ValueVectorWriteExpression ex){
> + public void addExpr(LogicalExpression ex){
> logger.debug("Adding next write {}", ex);
> currentEvalBlock = new JBlock();
> parentEvalBlock.add(currentEvalBlock);
> @@ -80,20 +79,27 @@ public class CodeGenerator<T> {
> return currentSetupBlock;
> }
>
> +
> + public TemplateClassDefinition<T> getDefinition() {
> + return definition;
> + }
> +
> public String generate() throws IOException{
>
> {
> //setup method
> - JMethod m = clazz.method(JMod.PUBLIC, model.VOID, this.setupName);
> + JMethod m = clazz.method(JMod.PUBLIC, model.VOID, "doSetup");
> m.param(model._ref(FragmentContext.class), "context");
> m.param(model._ref(RecordBatch.class), "incoming");
> m.param(model._ref(RecordBatch.class), "outgoing");
> + m._throws(SchemaChangeException.class);
> m.body().add(parentSetupBlock);
> }
>
> {
> // eval method.
> - JMethod m = clazz.method(JMod.PUBLIC, model.VOID,
> this.perRecordName);
> + JType ret = definition.getEvalReturnType() == null ? model.VOID :
> model._ref(definition.getEvalReturnType());
> + JMethod m = clazz.method(JMod.PUBLIC, ret, "doEval");
> m.param(model.INT, "inIndex");
> m.param(model.INT, "outIndex");
> m.body().add(parentEvalBlock);
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
> index 6b0e499..c219d9c 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
> @@ -14,7 +14,7 @@ import org.apache.drill.common.types.Types;
> import org.apache.drill.exec.expr.CodeGenerator.HoldingContainer;
> import org.apache.drill.exec.expr.fn.FunctionHolder;
> import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
> -import
> org.apache.drill.exec.physical.impl.filter.SelectionVectorPopulationExpression;
> +import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
> import org.apache.drill.exec.record.selection.SelectionVector2;
> import org.apache.drill.exec.vector.TypeHelper;
>
> @@ -117,7 +117,7 @@ public class EvaluationVisitor extends
> AbstractExprVisitor<HoldingContainer, Cod
> @Override
> public HoldingContainer visitBooleanConstant(BooleanExpression e,
> CodeGenerator<?> generator) throws RuntimeException {
> HoldingContainer out = generator.declare(e.getMajorType());
> - generator.getBlock().assign(out.getValue(),
> JExpr.lit(e.getBoolean()));
> + generator.getBlock().assign(out.getValue(), JExpr.lit(e.getBoolean()
> ? 1 : 0));
> return out;
> }
>
> @@ -127,8 +127,8 @@ public class EvaluationVisitor extends
> AbstractExprVisitor<HoldingContainer, Cod
> return visitValueVectorExpression((ValueVectorReadExpression) e,
> generator);
> }else if(e instanceof ValueVectorWriteExpression){
> return visitValueVectorWriteExpression((ValueVectorWriteExpression)
> e, generator);
> - }else if(e instanceof SelectionVectorPopulationExpression){
> - return
> visitSelectionVectorExpression((SelectionVectorPopulationExpression) e,
> generator);
> + }else if(e instanceof ReturnValueExpression){
> + return visitReturnValueExpression((ReturnValueExpression) e,
> generator);
> }else{
> return super.visitUnknown(e, generator);
> }
> @@ -196,21 +196,11 @@ public class EvaluationVisitor extends
> AbstractExprVisitor<HoldingContainer, Cod
> }
>
>
> - private HoldingContainer
> visitSelectionVectorExpression(SelectionVectorPopulationExpression e,
> CodeGenerator<?> generator){
> - JType svClass = generator.getModel()._ref(SelectionVector2.class);
> - JVar sv = generator.declareClassField("sv", svClass);
> - JVar index = generator.declareClassField("svIndex",
> generator.getModel().CHAR);
> + private HoldingContainer
> visitReturnValueExpression(ReturnValueExpression e, CodeGenerator<?>
> generator){
> LogicalExpression child = e.getChild();
>
> Preconditions.checkArgument(child.getMajorType().equals(Types.REQUIRED_BOOLEAN));
> HoldingContainer hc = child.accept(this, generator);
> - generator.getBlock()._return(hc.getValue());
> -
> -// JBlock blk = generator.getSetupBlock();
> -// blk.assign(sv,
> JExpr.direct("outgoing").invoke("getSelectionVector2"));
> -// JConditional jc = blk._if(hc.getValue());
> -// JBlock body = jc._then();
> -// body.add(sv.invoke("set").arg(index).arg(JExpr.direct("inIndex")));
> -// body.assign(index, index.plus(JExpr.lit(1)));
> + generator.getBlock()._return(hc.getValue().eq(JExpr.lit(1)));
> return null;
> }
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
> index 84f04f0..8e0f1be 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java
> @@ -25,7 +25,7 @@ import org.codehaus.janino.Parser;
> import org.codehaus.janino.Scanner;
> import org.mortbay.util.IO;
>
> -import com.beust.jcommander.internal.Lists;
> +import com.google.common.collect.Lists;
> import com.google.common.io.InputSupplier;
> import com.google.common.io.Resources;
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/MethodGrabbingVisitor.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/MethodGrabbingVisitor.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/MethodGrabbingVisitor.java
> index 22b9046..57268ee 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/MethodGrabbingVisitor.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/MethodGrabbingVisitor.java
> @@ -8,7 +8,8 @@ import org.codehaus.janino.Java.ClassDeclaration;
> import org.codehaus.janino.Java.MethodDeclarator;
> import org.codehaus.janino.util.Traverser;
>
> -import com.beust.jcommander.internal.Maps;
> +import com.google.common.collect.Maps;
> +
>
> public class MethodGrabbingVisitor{
> static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(MethodGrabbingVisitor.class);
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
> index 876b873..2ae4afa 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
> @@ -103,11 +103,14 @@ public class FragmentContext {
> return context.getAllocator();
> }
>
> - public <T> T getImplementationClass(TemplateClassDefinition<T>
> templateDefinition, CodeGenerator<T> cg) throws
> ClassTransformationException, IOException{
> - return transformer.getImplementationClass(this.loader,
> templateDefinition, cg.generate(), cg.getMaterializedClassName());
> + public <T> T getImplementationClass(CodeGenerator<T> cg) throws
> ClassTransformationException, IOException{
> + long t1 = System.nanoTime();
> + T t= transformer.getImplementationClass(this.loader,
> cg.getDefinition(), cg.generate(), cg.getMaterializedClassName());
> + System.out.println( (System.nanoTime() - t1)/1000/1000 );
> + return t;
> +
> }
>
> -
> public void addMetricsToStatus(FragmentStatus.Builder stats){
> stats.setBatchesCompleted(batchesCompleted.get());
> stats.setDataProcessed(dataProcessed.get());
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
> index 739c0d4..f96d6f3 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
> @@ -26,12 +26,14 @@ import
> org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
> import org.apache.drill.exec.physical.base.FragmentRoot;
> import org.apache.drill.exec.physical.base.PhysicalOperator;
> import org.apache.drill.exec.physical.base.Scan;
> +import org.apache.drill.exec.physical.config.Filter;
> import org.apache.drill.exec.physical.config.MockScanBatchCreator;
> import org.apache.drill.exec.physical.config.MockScanPOP;
> import org.apache.drill.exec.physical.config.Project;
> import org.apache.drill.exec.physical.config.RandomReceiver;
> import org.apache.drill.exec.physical.config.Screen;
> import org.apache.drill.exec.physical.config.SingleSender;
> +import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
> import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
> import org.apache.drill.exec.record.RecordBatch;
>
> @@ -46,6 +48,7 @@ public class ImplCreator extends
> AbstractPhysicalVisitor<RecordBatch, FragmentCo
> private RandomReceiverCreator rrc = new RandomReceiverCreator();
> private SingleSenderCreator ssc = new SingleSenderCreator();
> private ProjectBatchCreator pbc = new ProjectBatchCreator();
> + private FilterBatchCreator fbc = new FilterBatchCreator();
> private RootExec root = null;
>
> private ImplCreator(){}
> @@ -78,10 +81,13 @@ public class ImplCreator extends
> AbstractPhysicalVisitor<RecordBatch, FragmentCo
> root = sc.getRoot(context, op, getChildren(op, context));
> return null;
> }
> -
> -
>
> @Override
> + public RecordBatch visitFilter(Filter filter, FragmentContext context)
> throws ExecutionSetupException {
> + return fbc.getBatch(context, filter, getChildren(filter, context));
> + }
> +
> + @Override
> public RecordBatch visitSingleSender(SingleSender op, FragmentContext
> context) throws ExecutionSetupException {
> root = ssc.getRoot(context, op, getChildren(op, context));
> return null;
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
> index 5688bb1..084db54 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
> @@ -26,7 +26,6 @@ import org.apache.drill.common.expression.SchemaPath;
> import org.apache.drill.exec.exception.SchemaChangeException;
> import org.apache.drill.exec.ops.FragmentContext;
> import org.apache.drill.exec.record.BatchSchema;
> -import org.apache.drill.exec.record.InvalidValueAccessor;
> import org.apache.drill.exec.record.MaterializedField;
> import org.apache.drill.exec.record.RecordBatch;
> import org.apache.drill.exec.record.SchemaBuilder;
> @@ -36,9 +35,8 @@ import
> org.apache.drill.exec.record.selection.SelectionVector4;
> import org.apache.drill.exec.store.RecordReader;
> import org.apache.drill.exec.vector.ValueVector;
>
> -import com.beust.jcommander.internal.Lists;
> -import com.beust.jcommander.internal.Maps;
> -import com.carrotsearch.hppc.procedures.IntObjectProcedure;
> +import com.google.common.collect.Lists;
> +import com.google.common.collect.Maps;
>
> /**
> * Record batch used for a particular scan. Operators against one or more
> @@ -171,6 +169,11 @@ public class ScanBatch implements RecordBatch {
> }
>
> @Override
> + public Iterator<ValueVector> iterator() {
> + return vectors.iterator();
> + }
> +
> + @Override
> public WritableBatch getWritableBatch() {
> return WritableBatch.get(this.getRecordCount(), vectors);
> }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
> index d2b8bfd..a575f69 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
> @@ -17,6 +17,8 @@
>
> ******************************************************************************/
> package org.apache.drill.exec.physical.impl;
>
> +import java.util.Iterator;
> +
> import org.apache.drill.common.expression.SchemaPath;
> import org.apache.drill.exec.exception.SchemaChangeException;
> import org.apache.drill.exec.ops.FragmentContext;
> @@ -66,9 +68,13 @@ public class WireRecordBatch implements RecordBatch{
> public void kill() {
> fragProvider.kill(context);
> }
> -
>
> @Override
> + public Iterator<ValueVector> iterator() {
> + return batchLoader.iterator();
> + }
> +
> + @Override
> public SelectionVector2 getSelectionVector2() {
> throw new UnsupportedOperationException();
> }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java
> deleted file mode 100644
> index 85f598f..0000000
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java
> +++ /dev/null
> @@ -1,111 +0,0 @@
>
> -/*******************************************************************************
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements. See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership. The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License. You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> -
> ******************************************************************************/
> -
> -package org.apache.drill.exec.physical.impl.filter;
> -
> -import org.apache.drill.common.expression.SchemaPath;
> -import org.apache.drill.exec.exception.SchemaChangeException;
> -import org.apache.drill.exec.ops.FragmentContext;
> -import org.apache.drill.exec.record.BatchSchema;
> -import org.apache.drill.exec.record.RecordBatch;
> -import org.apache.drill.exec.record.WritableBatch;
> -import org.apache.drill.exec.record.selection.SelectionVector2;
> -import org.apache.drill.exec.record.selection.SelectionVector4;
> -import org.apache.drill.exec.vector.ValueVector;
> -
> -public class ExampleFilter implements RecordBatch {
> - static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(ExampleFilter.class);
> -
> - //private EvalutationPredicates []
> - private RecordBatch incoming;
> - private BatchSchema outboundSchema;
> - private int recordCount;
> -
> - private void reconfigureSchema() throws SchemaChangeException {
> - BatchSchema in = incoming.getSchema();
> - outboundSchema =
> BatchSchema.newBuilder().addFields(in).setSelectionVectorMode(BatchSchema.SelectionVectorMode.TWO_BYTE).build();
> - }
> -
> - private int generateSelectionVector(){
> - return -1;
> - }
> -
> - @Override
> - public FragmentContext getContext() {
> - return incoming.getContext();
> - }
> -
> - @Override
> - public BatchSchema getSchema() {
> - return outboundSchema;
> - }
> -
> - @Override
> - public int getRecordCount() {
> - return recordCount; //To change body of implemented methods use File
> | Settings | File Templates.
> - }
> -
> - @Override
> - public void kill() {
> - //To change body of implemented methods use File | Settings | File
> Templates.
> - }
> -
> - @Override
> - public SelectionVector2 getSelectionVector2() {
> - return null;
> - }
> -
> - @Override
> - public SelectionVector4 getSelectionVector4() {
> - return null;
> - }
> -
> - @Override
> - public TypedFieldId getValueVectorId(SchemaPath path) {
> - return null;
> - }
> -
> - @Override
> - public <T extends ValueVector> T getValueVectorById(int fieldId,
> Class<?> vvClass) {
> - return null;
> - }
> -
> - @Override
> - public IterOutcome next() {
> - IterOutcome out = incoming.next();
> - switch (incoming.next()) {
> -
> - case NONE:
> - return IterOutcome.NONE;
> - case OK_NEW_SCHEMA:
> - //reconfigureSchema();
> - case OK:
> - this.recordCount = generateSelectionVector();
> - return out;
> - case STOP:
> - return IterOutcome.STOP;
> - default:
> - throw new UnsupportedOperationException();
> - }
> - }
> -
> - @Override
> - public WritableBatch getWritableBatch() {
> - return null; //To change body of implemented methods use File |
> Settings | File Templates.
> - }
> -}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
> new file mode 100644
> index 0000000..df2518b
> --- /dev/null
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
> @@ -0,0 +1,23 @@
> +package org.apache.drill.exec.physical.impl.filter;
> +
> +import java.util.List;
> +
> +import org.apache.drill.common.exceptions.ExecutionSetupException;
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.physical.config.Filter;
> +import org.apache.drill.exec.physical.impl.BatchCreator;
> +import org.apache.drill.exec.record.RecordBatch;
> +
> +import com.google.common.base.Preconditions;
> +
> +public class FilterBatchCreator implements BatchCreator<Filter>{
> + static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(FilterBatchCreator.class);
> +
> + @Override
> + public RecordBatch getBatch(FragmentContext context, Filter config,
> List<RecordBatch> children) throws ExecutionSetupException {
> + Preconditions.checkArgument(children.size() == 1);
> + return new FilterRecordBatch(config, children.iterator().next(),
> context);
> + }
> +
> +
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java
> new file mode 100644
> index 0000000..0fad224
> --- /dev/null
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java
> @@ -0,0 +1,10 @@
> +package org.apache.drill.exec.physical.impl.filter;
> +
> +import org.apache.drill.exec.exception.SchemaChangeException;
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.record.RecordBatch;
> +
> +public interface FilterEvaluator {
> + public void doSetup(FragmentContext context, RecordBatch incoming,
> RecordBatch outgoing) throws SchemaChangeException;
> + public boolean doEval(int inIndex, int outIndex);
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
> new file mode 100644
> index 0000000..fc9dbc6
> --- /dev/null
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
> @@ -0,0 +1,200 @@
> +package org.apache.drill.exec.physical.impl.filter;
> +
> +import java.io.IOException;
> +import java.util.Iterator;
> +import java.util.List;
> +
> +import org.apache.drill.common.expression.ErrorCollector;
> +import org.apache.drill.common.expression.ErrorCollectorImpl;
> +import org.apache.drill.common.expression.FieldReference;
> +import org.apache.drill.common.expression.LogicalExpression;
> +import org.apache.drill.common.expression.PathSegment;
> +import org.apache.drill.common.expression.SchemaPath;
> +import org.apache.drill.common.logical.data.NamedExpression;
> +import org.apache.drill.common.types.TypeProtos.MajorType;
> +import org.apache.drill.exec.exception.ClassTransformationException;
> +import org.apache.drill.exec.exception.SchemaChangeException;
> +import org.apache.drill.exec.expr.CodeGenerator;
> +import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
> +import org.apache.drill.exec.expr.ValueVectorReadExpression;
> +import org.apache.drill.exec.expr.ValueVectorWriteExpression;
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.physical.config.Filter;
> +import org.apache.drill.exec.physical.config.Project;
> +import org.apache.drill.exec.physical.impl.VectorHolder;
> +import org.apache.drill.exec.physical.impl.project.Projector;
> +import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
> +import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
> +import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
> +import org.apache.drill.exec.record.BatchSchema;
> +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
> +import org.apache.drill.exec.record.MaterializedField;
> +import org.apache.drill.exec.record.RecordBatch;
> +import org.apache.drill.exec.record.SchemaBuilder;
> +import org.apache.drill.exec.record.TransferPair;
> +import org.apache.drill.exec.record.WritableBatch;
> +import org.apache.drill.exec.record.selection.SelectionVector2;
> +import org.apache.drill.exec.record.selection.SelectionVector4;
> +import org.apache.drill.exec.vector.AllocationHelper;
> +import org.apache.drill.exec.vector.NonRepeatedMutator;
> +import org.apache.drill.exec.vector.TypeHelper;
> +import org.apache.drill.exec.vector.ValueVector;
> +
> +import com.google.common.base.Preconditions;
> +import com.google.common.collect.Lists;
> +
> +public class FilterRecordBatch implements RecordBatch{
> + static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
> +
> + private final Filter filterConfig;
> + private final RecordBatch incoming;
> + private final FragmentContext context;
> + private final SelectionVector2 sv;
> + private BatchSchema outSchema;
> + private Filterer filter;
> + private List<ValueVector> outputVectors;
> + private VectorHolder vh;
> +
> + public FilterRecordBatch(Filter pop, RecordBatch incoming,
> FragmentContext context){
> + this.filterConfig = pop;
> + this.incoming = incoming;
> + this.context = context;
> + sv = new SelectionVector2(context.getAllocator());
> + }
> +
> +
> + @Override
> + public FragmentContext getContext() {
> + return context;
> + }
> +
> + @Override
> + public BatchSchema getSchema() {
> + Preconditions.checkNotNull(outSchema);
> + return outSchema;
> + }
> +
> + @Override
> + public int getRecordCount() {
> + return sv.getCount();
> + }
> +
> + @Override
> + public void kill() {
> + incoming.kill();
> + }
> +
> +
> + @Override
> + public Iterator<ValueVector> iterator() {
> + return outputVectors.iterator();
> + }
> +
> + @Override
> + public SelectionVector2 getSelectionVector2() {
> + return sv;
> + }
> +
> + @Override
> + public SelectionVector4 getSelectionVector4() {
> + throw new UnsupportedOperationException();
> + }
> +
> + @Override
> + public TypedFieldId getValueVectorId(SchemaPath path) {
> + return vh.getValueVector(path);
> + }
> +
> + @Override
> + public <T extends ValueVector> T getValueVectorById(int fieldId,
> Class<?> clazz) {
> + return vh.getValueVector(fieldId, clazz);
> + }
> +
> + @Override
> + public IterOutcome next() {
> +
> + IterOutcome upstream = incoming.next();
> + logger.debug("Upstream... {}", upstream);
> + switch(upstream){
> + case NONE:
> + case NOT_YET:
> + case STOP:
> + return upstream;
> + case OK_NEW_SCHEMA:
> + try{
> + filter = createNewFilterer();
> + }catch(SchemaChangeException ex){
> + incoming.kill();
> + logger.error("Failure during query", ex);
> + context.fail(ex);
> + return IterOutcome.STOP;
> + }
> + // fall through.
> + case OK:
> + int recordCount = incoming.getRecordCount();
> + sv.allocateNew(recordCount);
> + filter.filterBatch(recordCount);
> + for(ValueVector v : this.outputVectors){
> + ValueVector.Mutator m = v.getMutator();
> + if(m instanceof NonRepeatedMutator){
> + ((NonRepeatedMutator) m).setValueCount(recordCount);
> + }else{
> + throw new UnsupportedOperationException();
> + }
> + }
> + return upstream; // change if upstream changed, otherwise normal.
> + default:
> + throw new UnsupportedOperationException();
> + }
> + }
> +
> +
> + private Filterer createNewFilterer() throws SchemaChangeException{
> + if(outputVectors != null){
> + for(ValueVector v : outputVectors){
> + v.close();
> + }
> + }
> + this.outputVectors = Lists.newArrayList();
> + this.vh = new VectorHolder(outputVectors);
> + LogicalExpression filterExpression = filterConfig.getExpr();
> + final ErrorCollector collector = new ErrorCollectorImpl();
> + final List<TransferPair> transfers = Lists.newArrayList();
> + final CodeGenerator<Filterer> cg = new
> CodeGenerator<Filterer>(Filterer.TEMPLATE_DEFINITION,
> context.getFunctionRegistry());
> +
> + final LogicalExpression expr =
> ExpressionTreeMaterializer.materialize(filterExpression, incoming,
> collector);
> + if(collector.hasErrors()){
> + throw new SchemaChangeException(String.format("Failure while trying
> to materialize incoming schema. Errors:\n %s.",
> collector.toErrorString()));
> + }
> +
> + cg.addExpr(new ReturnValueExpression(expr));
> +
> + for(ValueVector v : incoming){
> + TransferPair pair = v.getTransferPair();
> + outputVectors.add(pair.getTo());
> + transfers.add(pair);
> + }
> +
> + SchemaBuilder bldr =
> BatchSchema.newBuilder().setSelectionVectorMode(SelectionVectorMode.TWO_BYTE);
> + for(ValueVector v : outputVectors){
> + bldr.addField(v.getField());
> + }
> + this.outSchema = bldr.build();
> +
> + try {
> + TransferPair[] tx = transfers.toArray(new
> TransferPair[transfers.size()]);
> + Filterer filterer = context.getImplementationClass(cg);
> + filterer.setup(context, incoming, this, tx);
> + return filterer;
> + } catch (ClassTransformationException | IOException e) {
> + throw new SchemaChangeException("Failure while attempting to load
> generated class", e);
> + }
> + }
> +
> + @Override
> + public WritableBatch getWritableBatch() {
> + return WritableBatch.get(sv.getCount(), outputVectors);
> + }
> +
> +
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
> index 216bfec..4092911 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
> @@ -1,18 +1,27 @@
> package org.apache.drill.exec.physical.impl.filter;
>
> +import org.apache.drill.exec.exception.SchemaChangeException;
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
> import org.apache.drill.exec.record.RecordBatch;
> +import org.apache.drill.exec.record.TransferPair;
> import org.apache.drill.exec.record.selection.SelectionVector2;
>
> -public abstract class FilterTemplate {
> +public abstract class FilterTemplate implements Filterer{
> static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(FilterTemplate.class);
>
> - SelectionVector2 outgoingSelectionVector;
> - SelectionVector2 incomingSelectionVector;
> + private SelectionVector2 outgoingSelectionVector;
> + private SelectionVector2 incomingSelectionVector;
> + private SelectionVectorMode svMode;
> + private TransferPair[] transfers;
>
> - public void setup(RecordBatch incoming, RecordBatch outgoing){
> - outgoingSelectionVector = outgoing.getSelectionVector2();
> -
> - switch(incoming.getSchema().getSelectionVector()){
> + @Override
> + public void setup(FragmentContext context, RecordBatch incoming,
> RecordBatch outgoing, TransferPair[] transfers) throws
> SchemaChangeException{
> + this.transfers = transfers;
> + this.outgoingSelectionVector = outgoing.getSelectionVector2();
> + this.svMode = incoming.getSchema().getSelectionVector();
> +
> + switch(svMode){
> case NONE:
> break;
> case TWO_BYTE:
> @@ -21,28 +30,54 @@ public abstract class FilterTemplate {
> default:
> throw new UnsupportedOperationException();
> }
> + doSetup(context, incoming, outgoing);
> + }
> +
> + private void doTransfers(){
> + for(TransferPair t : transfers){
> + t.transfer();
> + }
> }
>
> - public void filterBatchSV2(int recordCount){
> + public void filterBatch(int recordCount){
> + doTransfers();
> + switch(svMode){
> + case NONE:
> + filterBatchNoSV(recordCount);
> + break;
> + case TWO_BYTE:
> + filterBatchSV2(recordCount);
> + break;
> + default:
> + throw new UnsupportedOperationException();
> + }
> + }
> +
> + private void filterBatchSV2(int recordCount){
> int svIndex = 0;
> - for(char i =0; i < recordCount; i++){
> - if(include(i)){
> - outgoingSelectionVector.setIndex(svIndex, i);
> + final int count = recordCount*2;
> + for(int i = 0; i < count; i+=2){
> + char index = incomingSelectionVector.getIndex(i);
> + if(doEval(i, 0)){
> + outgoingSelectionVector.setIndex(svIndex, index);
> svIndex+=2;
> }
> }
> + outgoingSelectionVector.setRecordCount(svIndex/2);
> }
>
> - public void filterBatchNoSV(int recordCount){
> + private void filterBatchNoSV(int recordCount){
> int svIndex = 0;
> for(char i =0; i < recordCount; i++){
>
> - if(include(i)){
> + if(doEval(i, 0)){
> outgoingSelectionVector.setIndex(svIndex, i);
> svIndex+=2;
> }
> }
> + outgoingSelectionVector.setRecordCount(svIndex/2);
> }
>
> - protected abstract boolean include(int index);
> + protected abstract void doSetup(FragmentContext context, RecordBatch
> incoming, RecordBatch outgoing) throws SchemaChangeException;
> + protected abstract boolean doEval(int inIndex, int outIndex);
> }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
> new file mode 100644
> index 0000000..b270869
> --- /dev/null
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
> @@ -0,0 +1,19 @@
> +package org.apache.drill.exec.physical.impl.filter;
> +
> +import org.apache.drill.exec.compile.TemplateClassDefinition;
> +import org.apache.drill.exec.exception.SchemaChangeException;
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.physical.impl.project.Projector;
> +import org.apache.drill.exec.record.RecordBatch;
> +import org.apache.drill.exec.record.TransferPair;
> +
> +public interface Filterer {
> + static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(Filterer.class);
> +
> + public void setup(FragmentContext context, RecordBatch incoming,
> RecordBatch outgoing, TransferPair[] transfers) throws
> SchemaChangeException;
> + public void filterBatch(int recordCount);
> +
> + public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION =
> new TemplateClassDefinition<Filterer>( //
> + Filterer.class,
> "org.apache.drill.exec.physical.impl.filter.FilterTemplate",
> FilterEvaluator.class, boolean.class);
> +
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java
> new file mode 100644
> index 0000000..a794d63
> --- /dev/null
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java
> @@ -0,0 +1,39 @@
> +package org.apache.drill.exec.physical.impl.filter;
> +
> +import org.apache.drill.common.expression.ExpressionPosition;
> +import org.apache.drill.common.expression.LogicalExpression;
> +import org.apache.drill.common.expression.visitors.ExprVisitor;
> +import org.apache.drill.common.types.Types;
> +import org.apache.drill.common.types.TypeProtos.MajorType;
> +
> +public class ReturnValueExpression implements LogicalExpression{
> + static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(ReturnValueExpression.class);
> +
> + private LogicalExpression child;
> +
> + public ReturnValueExpression(LogicalExpression child) {
> + this.child = child;
> + }
> +
> + public LogicalExpression getChild() {
> + return child;
> + }
> +
> + @Override
> + public MajorType getMajorType() {
> + return Types.NULL;
> + }
> +
> + @Override
> + public <T, V, E extends Exception> T accept(ExprVisitor<T, V, E>
> visitor, V value) throws E {
> + return visitor.visitUnknown(this, value);
> + }
> +
> + @Override
> + public ExpressionPosition getPosition() {
> + return ExpressionPosition.UNKNOWN;
> + }
> +
> +
> +
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/SelectionVectorPopulationExpression.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/SelectionVectorPopulationExpression.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/SelectionVectorPopulationExpression.java
> deleted file mode 100644
> index f253695..0000000
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/SelectionVectorPopulationExpression.java
> +++ /dev/null
> @@ -1,39 +0,0 @@
> -package org.apache.drill.exec.physical.impl.filter;
> -
> -import org.apache.drill.common.expression.ExpressionPosition;
> -import org.apache.drill.common.expression.LogicalExpression;
> -import org.apache.drill.common.expression.visitors.ExprVisitor;
> -import org.apache.drill.common.types.Types;
> -import org.apache.drill.common.types.TypeProtos.MajorType;
> -
> -public class SelectionVectorPopulationExpression implements
> LogicalExpression{
> - static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(SelectionVectorPopulationExpression.class);
> -
> - private LogicalExpression child;
> -
> - public SelectionVectorPopulationExpression(LogicalExpression child) {
> - this.child = child;
> - }
> -
> - public LogicalExpression getChild() {
> - return child;
> - }
> -
> - @Override
> - public MajorType getMajorType() {
> - return Types.NULL;
> - }
> -
> - @Override
> - public <T, V, E extends Exception> T accept(ExprVisitor<T, V, E>
> visitor, V value) throws E {
> - return visitor.visitUnknown(this, value);
> - }
> -
> - @Override
> - public ExpressionPosition getPosition() {
> - return ExpressionPosition.UNKNOWN;
> - }
> -
> -
> -
> -}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
> index 86caf28..5fd1fb4 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
> @@ -7,6 +7,6 @@ import org.apache.drill.exec.record.RecordBatch;
> public interface ProjectEvaluator {
> static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(ProjectEvaluator.class);
>
> - public abstract void setupEvaluators(FragmentContext context,
> RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
> - public abstract void doPerRecordWork(int inIndex, int outIndex);
> + public abstract void doSetup(FragmentContext context, RecordBatch
> incoming, RecordBatch outgoing) throws SchemaChangeException;
> + public abstract void doEval(int inIndex, int outIndex);
> }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
> index 3d1e3f7..060cd92 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
> @@ -1,6 +1,7 @@
> package org.apache.drill.exec.physical.impl.project;
>
> import java.io.IOException;
> +import java.util.Iterator;
> import java.util.List;
>
> import org.apache.drill.common.expression.ErrorCollector;
> @@ -59,7 +60,11 @@ public class ProjectRecordBatch implements RecordBatch{
> this.context = context;
> }
>
> -
> + @Override
> + public Iterator<ValueVector> iterator() {
> + return outputVectors.iterator();
> + }
> +
> @Override
> public FragmentContext getContext() {
> return context;
> @@ -180,7 +185,7 @@ public class ProjectRecordBatch implements RecordBatch{
> allocationVectors.add(vector);
> outputVectors.add(vector);
> ValueVectorWriteExpression write = new
> ValueVectorWriteExpression(outputVectors.size() - 1, expr);
> - cg.addNextWrite(write);
> + cg.addExpr(write);
> }
>
> }
> @@ -192,7 +197,7 @@ public class ProjectRecordBatch implements RecordBatch{
> this.outSchema = bldr.build();
>
> try {
> - Projector projector =
> context.getImplementationClass(Projector.TEMPLATE_DEFINITION, cg);
> + Projector projector = context.getImplementationClass(cg);
> projector.setup(context, incoming, this, transfers);
> return projector;
> } catch (ClassTransformationException | IOException e) {
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
> index 2787f0c..0d1e201 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
> @@ -16,6 +16,6 @@ public interface Projector {
> public abstract int projectRecords(int recordCount, int
> firstOutputIndex);
>
> public static TemplateClassDefinition<Projector> TEMPLATE_DEFINITION =
> new TemplateClassDefinition<Projector>( //
> - Projector.class,
> "org.apache.drill.exec.physical.impl.project.ProjectorTemplate",
> ProjectEvaluator.class, "setupEvaluators", "doPerRecordWork");
> + Projector.class,
> "org.apache.drill.exec.physical.impl.project.ProjectorTemplate",
> ProjectEvaluator.class, null);
>
> }
> \ No newline at end of file
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
> index 486c7b0..735d355 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
> @@ -33,7 +33,7 @@ public abstract class ProjectorTemplate implements
> Projector {
> case TWO_BYTE:
> final int count = recordCount*2;
> for(int i = 0; i < count; i+=2, firstOutputIndex++){
> - doPerRecordWork(vector2.getIndex(i), firstOutputIndex);
> + doEval(vector2.getIndex(i), firstOutputIndex);
> }
> return recordCount;
>
> @@ -45,7 +45,7 @@ public abstract class ProjectorTemplate implements
> Projector {
> }
> final int countN = recordCount;
> for (int i = 0; i < countN; i++, firstOutputIndex++) {
> - doPerRecordWork(i, firstOutputIndex);
> + doEval(i, firstOutputIndex);
> }
> return recordCount;
>
> @@ -68,11 +68,11 @@ public abstract class ProjectorTemplate implements
> Projector {
> break;
> }
> this.transfers = ImmutableList.copyOf(transfers);
> - setupEvaluators(context, incoming, outgoing);
> + setupEval(context, incoming, outgoing);
> }
>
> - protected abstract void setupEvaluators(FragmentContext context,
> RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
> - protected abstract void doPerRecordWork(int inIndex, int outIndex);
> + protected abstract void setupEval(FragmentContext context, RecordBatch
> incoming, RecordBatch outgoing) throws SchemaChangeException;
> + protected abstract void doEval(int inIndex, int outIndex);
>
>
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
> index 650a148..ff856d4 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
> @@ -29,7 +29,7 @@ import org.apache.drill.exec.vector.ValueVector;
> * composed of ValueVectors, ideally a batch fits within L2 cache (~256k
> per core). The set of value vectors do not
> * change unless the next() IterOutcome is a *_NEW_SCHEMA type.
> */
> -public interface RecordBatch {
> +public interface RecordBatch extends Iterable<ValueVector>{
>
> /**
> * Describes the outcome of a RecordBatch being incremented forward.
> @@ -88,6 +88,7 @@ public interface RecordBatch {
> public abstract TypedFieldId getValueVectorId(SchemaPath path);
>
>
> +
> public abstract <T extends ValueVector> T getValueVectorById(int
> fieldId, Class<?> clazz);
>
> /**
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
> index e2a1648..5f7648b 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
> @@ -33,9 +33,9 @@ import
> org.apache.drill.exec.record.RecordBatch.TypedFieldId;
> import org.apache.drill.exec.vector.TypeHelper;
> import org.apache.drill.exec.vector.ValueVector;
>
> -import com.beust.jcommander.internal.Lists;
> -import com.beust.jcommander.internal.Maps;
> import com.google.common.collect.ImmutableList;
> +import com.google.common.collect.Lists;
> +import com.google.common.collect.Maps;
>
> public class RecordBatchLoader implements Iterable<ValueVector>{
> static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(RecordBatchLoader.class);
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
> index 0989c1d..34e4043 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
> @@ -23,8 +23,8 @@ import java.util.Set;
> import org.apache.drill.exec.exception.SchemaChangeException;
> import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
>
> -import com.beust.jcommander.internal.Sets;
> import com.google.common.collect.Lists;
> +import com.google.common.collect.Sets;
>
> /**
> * A reusable builder that supports the creation of BatchSchemas. Can
> have a supporting expected object. If the expected Schema object is
> defined, the
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
> index cdc136e..88f0c79 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
> @@ -19,16 +19,20 @@ package org.apache.drill.exec.record.selection;
>
> import io.netty.buffer.ByteBuf;
>
> +import java.io.Closeable;
> +import java.io.IOException;
> +
> import org.apache.drill.exec.memory.BufferAllocator;
> import org.apache.drill.exec.record.DeadBuf;
>
> /**
> * A selection vector that fronts, at most, a
> */
> -public class SelectionVector2{
> +public class SelectionVector2 implements Closeable{
> static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(SelectionVector2.class);
>
> private final BufferAllocator allocator;
> + private int recordCount;
> private ByteBuf buffer = DeadBuf.DEAD_BUFFER;
>
> public SelectionVector2(BufferAllocator allocator) {
> @@ -36,14 +40,40 @@ public class SelectionVector2{
> }
>
> public int getCount(){
> - return -1;
> + return recordCount;
> }
>
> - public int getIndex(int directIndex){
> + public char getIndex(int directIndex){
> return buffer.getChar(directIndex);
> }
>
> public void setIndex(int directIndex, char value){
> buffer.setChar(directIndex, value);
> }
> +
> + public void allocateNew(int size){
> + clear();
> + buffer = allocator.buffer(size * 2);
> + }
> +
> +
> + public void clear() {
> + if (buffer != DeadBuf.DEAD_BUFFER) {
> + buffer.release();
> + buffer = DeadBuf.DEAD_BUFFER;
> + recordCount = 0;
> + }
> + }
> +
> + public void setRecordCount(int recordCount){
> + logger.debug("Seting record count to {}", recordCount);
> + this.recordCount = recordCount;
> + }
> +
> + @Override
> + public void close() throws IOException {
> + clear();
> + }
> +
> +
> }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
> index 8513dfe..3a57410 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
> @@ -39,13 +39,14 @@ import
> org.apache.drill.exec.vector.NullableVarChar4Vector;
> import org.apache.drill.exec.vector.TypeHelper;
> import org.apache.drill.exec.vector.ValueVector;
>
> -import com.beust.jcommander.internal.Maps;
> +
> import com.fasterxml.jackson.core.JsonFactory;
> import com.fasterxml.jackson.core.JsonParser;
> import com.fasterxml.jackson.core.JsonToken;
> import com.google.common.base.Charsets;
> import com.google.common.collect.Iterables;
> import com.google.common.collect.Lists;
> +import com.google.common.collect.Maps;
> import com.google.common.io.Files;
> import com.google.common.io.InputSupplier;
> import com.google.common.io.Resources;
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
> index cb1e1d6..d2889ed 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
> +++
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
> @@ -40,7 +40,7 @@ public class TestClassTransformation {
>
> TemplateClassDefinition<ExampleExternalInterface> def = new
> TemplateClassDefinition<ExampleExternalInterface>(
> ExampleExternalInterface.class,
> "org.apache.drill.exec.compile.ExampleTemplate",
> - ExampleInternalInterface.class, "a", "b");
> + ExampleInternalInterface.class, null);
>
>
> ClassTransformer ct = new ClassTransformer();
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
> index 623af0e..c610374 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
> +++
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
> @@ -92,7 +92,7 @@ public class ExpressionTest {
> }
>
> CodeGenerator<Projector> cg = new
> CodeGenerator<Projector>(Projector.TEMPLATE_DEFINITION, new
> FunctionImplementationRegistry(DrillConfig.create()));
> - cg.addNextWrite(new ValueVectorWriteExpression(-1, materializedExpr));
> + cg.addExpr(new ValueVectorWriteExpression(-1, materializedExpr));
> return cg.generate();
> }
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
> index d125ec0..c6434f7 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
> +++
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
> @@ -5,6 +5,7 @@ import
> org.apache.drill.exec.physical.impl.ScreenCreator.ScreenRoot;
> import org.apache.drill.exec.record.RecordBatch;
> import org.apache.drill.exec.record.RecordBatch.IterOutcome;
> import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
> +import org.apache.drill.exec.record.selection.SelectionVector2;
> import org.apache.drill.exec.vector.ValueVector;
>
> public class SimpleRootExec implements RootExec{
> @@ -21,6 +22,9 @@ public class SimpleRootExec implements RootExec{
>
> }
>
> + public SelectionVector2 getSelectionVector2(){
> + return incoming.getSelectionVector2();
> + }
>
> public <T extends ValueVector> T getValueVectorById(SchemaPath path,
> Class<?> vvClass){
> TypedFieldId tfid = incoming.getValueVectorId(path);
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
> new file mode 100644
> index 0000000..df11aa7
> --- /dev/null
> +++
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
> @@ -0,0 +1,58 @@
> +package org.apache.drill.exec.physical.impl.filter;
> +
> +import mockit.Injectable;
> +import mockit.NonStrictExpectations;
> +
> +import org.apache.drill.common.config.DrillConfig;
> +import org.apache.drill.common.util.FileUtils;
> +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
> +import org.apache.drill.exec.memory.BufferAllocator;
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.physical.PhysicalPlan;
> +import org.apache.drill.exec.physical.base.FragmentRoot;
> +import org.apache.drill.exec.physical.impl.ImplCreator;
> +import org.apache.drill.exec.physical.impl.SimpleRootExec;
> +import org.apache.drill.exec.planner.PhysicalPlanReader;
> +import org.apache.drill.exec.proto.CoordinationProtos;
> +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
> +import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
> +import org.apache.drill.exec.server.DrillbitContext;
> +import org.junit.After;
> +import org.junit.Test;
> +
> +import com.google.common.base.Charsets;
> +import com.google.common.io.Files;
> +import com.yammer.metrics.MetricRegistry;
> +
> +public class TestSimpleFilter {
> + static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(TestSimpleFilter.class);
> + DrillConfig c = DrillConfig.create();
> +
> +
> + @Test
> + public void project(@Injectable final DrillbitContext bitContext,
> @Injectable UserClientConnection connection) throws Exception{
> + System.out.println(System.getProperty("java.class.path"));
> +
> +
> + new NonStrictExpectations(){{
> + bitContext.getMetrics(); result = new MetricRegistry("test");
> + bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
> + }};
> +
> +
> + PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(),
> CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
> + PhysicalPlan plan =
> reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/filter/test1.json"),
> Charsets.UTF_8));
> + FunctionImplementationRegistry registry = new
> FunctionImplementationRegistry(c);
> + FragmentContext context = new FragmentContext(bitContext,
> FragmentHandle.getDefaultInstance(), connection, null, registry);
> + SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context,
> (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
> + while(exec.next()){
> + System.out.println(exec.getSelectionVector2().getCount());
> + }
> + }
> +
> + @After
> + public void tearDown() throws Exception{
> + // pause to get logger to catch up.
> + Thread.sleep(1000);
> + }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
> index 925faf7..68b8881 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
> +++
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
> @@ -31,7 +31,7 @@ import
> org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
> import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
> import org.junit.Test;
>
> -import com.beust.jcommander.internal.Lists;
> +import com.google.common.collect.Lists;
> import com.google.common.collect.Range;
>
> public class ExpressionTreeMaterializerTest {
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
> index 7c9e8f4..4a0358e 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
> +++
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
> @@ -24,7 +24,8 @@ import org.apache.drill.exec.vector.ValueVector;
> import org.junit.Ignore;
> import org.junit.Test;
>
> -import com.beust.jcommander.internal.Lists;
> +import com.google.common.collect.Lists;
> +
>
> public class JSONRecordReaderTest {
> private static final Charset UTF_8 = Charset.forName("UTF-8");
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/65e2cfce/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json
> b/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json
> new file mode 100644
> index 0000000..a892c70
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/test/resources/filter/test1.json
> @@ -0,0 +1,34 @@
> +{
> + head:{
> + type:"APACHE_DRILL_PHYSICAL",
> + version:"1",
> + generator:{
> + type:"manual"
> + }
> + },
> + graph:[
> + {
> + @id:1,
> + pop:"mock-scan",
> + url: "http://apache.org",
> + entries:[
> + {records: 100, types: [
> + {name: "blue", type: "INT", mode: "REQUIRED"},
> + {name: "red", type: "BIGINT", mode: "REQUIRED"},
> + {name: "green", type: "INT", mode: "REQUIRED"}
> + ]}
> + ]
> + },
> + {
> + @id:2,
> + child: 1,
> + pop:"filter",
> + expr: "true"
> + },
> + {
> + @id: 3,
> + child: 2,
> + pop: "screen"
> + }
> + ]
> +}
> \ No newline at end of file
>
>