You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/12/02 05:23:51 UTC

[04/10] git commit: DRILL-313: Fix for Limit operator only transferring buffers on new schema

DRILL-313: Fix for Limit operator only transferring buffers on new schema


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/6c0389f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/6c0389f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/6c0389f3

Branch: refs/heads/master
Commit: 6c0389f394a789beb74103309f5bed13ddeccf95
Parents: b91f2e8
Author: Steven Phillips <sp...@maprtech.com>
Authored: Sun Dec 1 19:48:57 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 19:48:57 2013 -0800

----------------------------------------------------------------------
 .../physical/impl/limit/LimitRecordBatch.java   |  8 ++--
 .../drill/exec/fn/impl/GeneratorFunctions.java  | 22 ++++++++-
 .../physical/impl/limit/TestSimpleLimit.java    | 35 ++++++++++++++
 .../src/test/resources/limit/test4.json         | 49 ++++++++++++++++++++
 4 files changed, 109 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c0389f3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 8390997..712af9f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -36,6 +36,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   private int recordsLeft;
   private boolean noEndLimit;
   private boolean skipBatch;
+  List<TransferPair> transfers = Lists.newArrayList();
 
   public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming) {
     super(popConfig, context, incoming);
@@ -52,7 +53,6 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   protected void setupNewSchema() throws SchemaChangeException {
     container.clear();
 
-    List<TransferPair> transfers = Lists.newArrayList();
 
     for(VectorWrapper<?> v : incoming){
       TransferPair pair = v.getValueVector().getTransferPair();
@@ -74,9 +74,6 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
 
     container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE);
 
-    for(TransferPair tp : transfers) {
-      tp.transfer();
-    }
   }
 
   @Override
@@ -96,6 +93,9 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
 
   @Override
   protected void doWork() {
+    for(TransferPair tp : transfers) {
+      tp.transfer();
+    }
     skipBatch = false;
     int recordCount = incoming.getRecordCount();
     if(recordCount <= recordsToSkip) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c0389f3/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java
index d12633e..b79ccd0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.expr.annotations.FunctionTemplate;
 import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
 import org.apache.drill.exec.expr.annotations.Output;
 import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.annotations.Workspace;
 import org.apache.drill.exec.expr.holders.*;
 import org.apache.drill.exec.record.RecordBatch;
 
@@ -38,17 +39,36 @@ public class GeneratorFunctions {
           OutputTypeDeterminer.FIXED_BIGINT, "randomBigInt");
   public static final FunctionDefinition RANDOM_FLOAT8 = FunctionDefinition.simple("randomFloat8", new ArgumentValidators.NumericTypeAllowed(1,2, true),
           OutputTypeDeterminer.FIXED_FLOAT8, "randomFloat8");
+  public static final FunctionDefinition INCREASING_BIGINT = FunctionDefinition.simple("increasingBigInt", new ArgumentValidators.NumericTypeAllowed(1, true),
+          OutputTypeDeterminer.FIXED_BIGINT, "increasingBigInt");
 
   public static class Provider implements CallProvider {
 
     @Override
     public FunctionDefinition[] getFunctionDefintions() {
       return new FunctionDefinition[] { RANDOM_BIG_INT,
-                                        RANDOM_FLOAT8 };
+                                        RANDOM_FLOAT8,
+                                        INCREASING_BIGINT };
     }
 
   }
 
+  @FunctionTemplate(name = "increasingBigInt", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class IncreasingBigInt implements DrillSimpleFunc {
+
+    @Param BigIntHolder start;
+    @Workspace long current;
+    @Output BigIntHolder out;
+
+    public void setup(RecordBatch incoming) {
+      current = 0;
+    }
+
+    public void eval() {
+      out.value = start.value + current++;
+    }
+  }
+
   @FunctionTemplate(name = "randomBigInt", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
   public static class RandomBigIntGauss implements DrillSimpleFunc {
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c0389f3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
index b254fc0..1ee9ceb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
@@ -38,8 +38,11 @@ import org.apache.drill.exec.physical.impl.SimpleRootExec;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.user.UserServer;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.ValueVector;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -79,6 +82,13 @@ public class TestSimpleLimit {
     }};
 
     verifyLimitCount(bitContext, connection, "test2.json", 69999);
+    long start = 30000;
+    long end = 100000;
+    long expectedSum = (end - start) * (end + start - 1) / 2; //Formula for sum of series
+   
+    verifySum(bitContext, connection, "test4.json", 70000, expectedSum);
+    
+
   }
 
   private void verifyLimitCount(DrillbitContext bitContext, UserServer.UserClientConnection connection, String testPlan, int expectedCount) throws Throwable {
@@ -99,4 +109,29 @@ public class TestSimpleLimit {
     }
     assertTrue(!context.isFailed());
   }
+
+  private void verifySum(DrillbitContext bitContext, UserServer.UserClientConnection connection, String testPlan, int expectedCount, long expectedSum) throws Throwable {
+    PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+    PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/limit/" + testPlan), Charsets.UTF_8));
+    FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+    int recordCount = 0;
+    long sum = 0;
+    while(exec.next()){
+      recordCount += exec.getRecordCount();
+      BigIntVector v = (BigIntVector) exec.iterator().next();
+      for (int i = 0; i < v.getAccessor().getValueCount(); i++) {
+        sum += v.getAccessor().get(i);
+      }
+    }
+
+    assertEquals(expectedCount, recordCount);
+    assertEquals(expectedSum, sum);
+
+    if(context.getFailureCause() != null){
+      throw context.getFailureCause();
+    }
+    assertTrue(!context.isFailed());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c0389f3/exec/java-exec/src/test/resources/limit/test4.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/limit/test4.json b/exec/java-exec/src/test/resources/limit/test4.json
new file mode 100644
index 0000000..b7793b1
--- /dev/null
+++ b/exec/java-exec/src/test/resources/limit/test4.json
@@ -0,0 +1,49 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+	graph:[
+        {
+            @id:1,
+            pop:"mock-sub-scan",
+            url: "http://apache.org",
+            entries:[
+            	{records: 100000000, types: [
+            	  {name: "blue", type: "INT", mode: "REQUIRED"},
+            	  {name: "red", type: "BIGINT", mode: "REQUIRED"},
+            	  {name: "green", type: "INT", mode: "REQUIRED"}
+            	]}
+            ]
+        },
+        {
+           @id:2,
+           child: 1,
+           pop:"project",
+           exprs: [
+             { ref: "col1", expr:"increasingBigInt(0)"}
+           ]
+        },
+        {
+            @id:3,
+            child: 2,
+            pop:"limit",
+            first:30000,
+            last:100000
+        },
+        {
+          @id:4,
+          child:3,
+          pop: "selection-vector-remover"
+
+        },
+        {
+            @id: 5,
+            child: 4,
+            pop: "screen"
+        }
+    ]
+}
\ No newline at end of file