You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/12/02 05:23:51 UTC
[04/10] git commit: DRILL-313: Fix for Limit operator only
transferring buffers on new schema
DRILL-313: Fix for Limit operator only transferring buffers on new schema
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/6c0389f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/6c0389f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/6c0389f3
Branch: refs/heads/master
Commit: 6c0389f394a789beb74103309f5bed13ddeccf95
Parents: b91f2e8
Author: Steven Phillips <sp...@maprtech.com>
Authored: Sun Dec 1 19:48:57 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Dec 1 19:48:57 2013 -0800
----------------------------------------------------------------------
.../physical/impl/limit/LimitRecordBatch.java | 8 ++--
.../drill/exec/fn/impl/GeneratorFunctions.java | 22 ++++++++-
.../physical/impl/limit/TestSimpleLimit.java | 35 ++++++++++++++
.../src/test/resources/limit/test4.json | 49 ++++++++++++++++++++
4 files changed, 109 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c0389f3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 8390997..712af9f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -36,6 +36,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
private int recordsLeft;
private boolean noEndLimit;
private boolean skipBatch;
+ List<TransferPair> transfers = Lists.newArrayList();
public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming) {
super(popConfig, context, incoming);
@@ -52,7 +53,6 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
protected void setupNewSchema() throws SchemaChangeException {
container.clear();
- List<TransferPair> transfers = Lists.newArrayList();
for(VectorWrapper<?> v : incoming){
TransferPair pair = v.getValueVector().getTransferPair();
@@ -74,9 +74,6 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE);
- for(TransferPair tp : transfers) {
- tp.transfer();
- }
}
@Override
@@ -96,6 +93,9 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
@Override
protected void doWork() {
+ for(TransferPair tp : transfers) {
+ tp.transfer();
+ }
skipBatch = false;
int recordCount = incoming.getRecordCount();
if(recordCount <= recordsToSkip) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c0389f3/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java
index d12633e..b79ccd0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.expr.annotations.FunctionTemplate;
import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
import org.apache.drill.exec.expr.annotations.Output;
import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.annotations.Workspace;
import org.apache.drill.exec.expr.holders.*;
import org.apache.drill.exec.record.RecordBatch;
@@ -38,17 +39,36 @@ public class GeneratorFunctions {
OutputTypeDeterminer.FIXED_BIGINT, "randomBigInt");
public static final FunctionDefinition RANDOM_FLOAT8 = FunctionDefinition.simple("randomFloat8", new ArgumentValidators.NumericTypeAllowed(1,2, true),
OutputTypeDeterminer.FIXED_FLOAT8, "randomFloat8");
+ public static final FunctionDefinition INCREASING_BIGINT = FunctionDefinition.simple("increasingBigInt", new ArgumentValidators.NumericTypeAllowed(1, true),
+ OutputTypeDeterminer.FIXED_BIGINT, "increasingBigInt");
public static class Provider implements CallProvider {
@Override
public FunctionDefinition[] getFunctionDefintions() {
return new FunctionDefinition[] { RANDOM_BIG_INT,
- RANDOM_FLOAT8 };
+ RANDOM_FLOAT8,
+ INCREASING_BIGINT };
}
}
+ @FunctionTemplate(name = "increasingBigInt", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+ public static class IncreasingBigInt implements DrillSimpleFunc {
+
+ @Param BigIntHolder start;
+ @Workspace long current;
+ @Output BigIntHolder out;
+
+ public void setup(RecordBatch incoming) {
+ current = 0;
+ }
+
+ public void eval() {
+ out.value = start.value + current++;
+ }
+ }
+
@FunctionTemplate(name = "randomBigInt", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
public static class RandomBigIntGauss implements DrillSimpleFunc {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c0389f3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
index b254fc0..1ee9ceb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
@@ -38,8 +38,11 @@ import org.apache.drill.exec.physical.impl.SimpleRootExec;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.user.UserServer;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.ValueVector;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -79,6 +82,13 @@ public class TestSimpleLimit {
}};
verifyLimitCount(bitContext, connection, "test2.json", 69999);
+ long start = 30000;
+ long end = 100000;
+ long expectedSum = (end - start) * (end + start - 1) / 2; //Formula for sum of series
+
+ verifySum(bitContext, connection, "test4.json", 70000, expectedSum);
+
+
}
private void verifyLimitCount(DrillbitContext bitContext, UserServer.UserClientConnection connection, String testPlan, int expectedCount) throws Throwable {
@@ -99,4 +109,29 @@ public class TestSimpleLimit {
}
assertTrue(!context.isFailed());
}
+
+ private void verifySum(DrillbitContext bitContext, UserServer.UserClientConnection connection, String testPlan, int expectedCount, long expectedSum) throws Throwable {
+ PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+ PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/limit/" + testPlan), Charsets.UTF_8));
+ FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+ FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+ SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+ int recordCount = 0;
+ long sum = 0;
+ while(exec.next()){
+ recordCount += exec.getRecordCount();
+ BigIntVector v = (BigIntVector) exec.iterator().next();
+ for (int i = 0; i < v.getAccessor().getValueCount(); i++) {
+ sum += v.getAccessor().get(i);
+ }
+ }
+
+ assertEquals(expectedCount, recordCount);
+ assertEquals(expectedSum, sum);
+
+ if(context.getFailureCause() != null){
+ throw context.getFailureCause();
+ }
+ assertTrue(!context.isFailed());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c0389f3/exec/java-exec/src/test/resources/limit/test4.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/limit/test4.json b/exec/java-exec/src/test/resources/limit/test4.json
new file mode 100644
index 0000000..b7793b1
--- /dev/null
+++ b/exec/java-exec/src/test/resources/limit/test4.json
@@ -0,0 +1,49 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"mock-sub-scan",
+ url: "http://apache.org",
+ entries:[
+ {records: 100000000, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "red", type: "BIGINT", mode: "REQUIRED"},
+ {name: "green", type: "INT", mode: "REQUIRED"}
+ ]}
+ ]
+ },
+ {
+ @id:2,
+ child: 1,
+ pop:"project",
+ exprs: [
+ { ref: "col1", expr:"increasingBigInt(0)"}
+ ]
+ },
+ {
+ @id:3,
+ child: 2,
+ pop:"limit",
+ first:30000,
+ last:100000
+ },
+ {
+ @id:4,
+ child:3,
+ pop: "selection-vector-remover"
+
+ },
+ {
+ @id: 5,
+ child: 4,
+ pop: "screen"
+ }
+ ]
+}
\ No newline at end of file