You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/12/14 08:27:57 UTC
[1/2] drill git commit: DRILL-4163 Schema changes support in
MergeJoin Operator.
Repository: drill
Updated Branches:
refs/heads/master bb3fc1521 -> e529df460
DRILL-4163 Schema changes support in MergeJoin Operator.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/cc9175c1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/cc9175c1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/cc9175c1
Branch: refs/heads/master
Commit: cc9175c13270660ffd9ec2ddcbc70780dd72dada
Parents: bb3fc15
Author: Amit Hadke <am...@gmail.com>
Authored: Fri Dec 4 16:38:36 2015 -0800
Committer: Steven Phillips <sm...@apache.org>
Committed: Sun Dec 13 23:22:55 2015 -0800
----------------------------------------------------------------------
.../exec/physical/impl/join/JoinTemplate.java | 2 +-
.../exec/physical/impl/join/JoinUtils.java | 3 +
.../exec/physical/impl/join/MergeJoinBatch.java | 11 +-
.../drill/exec/record/VectorContainer.java | 5 +-
.../join/TestMergeJoinWithSchemaChanges.java | 348 +++++++++++++++++++
5 files changed, 360 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/cc9175c1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index 40c47b3..43cbf71 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -60,7 +60,7 @@ public abstract class JoinTemplate implements JoinWorker {
if (status.left.finished()) {
return true;
}
- final int comparison = doCompare(status.left.getCurrentPosition(), status.right.getCurrentPosition());
+ final int comparison = Integer.signum(doCompare(status.left.getCurrentPosition(), status.right.getCurrentPosition()));
switch (comparison) {
case -1:
// left key < right key
http://git-wip-us.apache.org/repos/asf/drill/blob/cc9175c1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
index 2476a83..61640bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
@@ -174,6 +174,9 @@ public class JoinUtils {
TypeProtos.MinorType rightType = rightExpression.getMajorType().getMinorType();
TypeProtos.MinorType leftType = leftExpression.getMajorType().getMinorType();
+ if (rightType == TypeProtos.MinorType.UNION || leftType == TypeProtos.MinorType.UNION) {
+ continue;
+ }
if (rightType != leftType) {
// currently we only support implicit casts if the input types are numeric or varchar/varbinary
http://git-wip-us.apache.org/repos/asf/drill/blob/cc9175c1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index edafbfc..9ef5cde 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -102,6 +102,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
private JoinWorker worker;
private boolean areNullsEqual = false; // whether nulls compare equal
+
private static final String LEFT_INPUT = "LEFT INPUT";
private static final String RIGHT_INPUT = "RIGHT INPUT";
@@ -381,13 +382,11 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
private void allocateBatch(boolean newSchema) {
- // allocate new batch space.
- container.zeroVectors();
-
boolean leftAllowed = status.getLeftStatus() != IterOutcome.NONE;
boolean rightAllowed = status.getRightStatus() != IterOutcome.NONE;
if (newSchema) {
+ container.clear();
// add fields from both batches
if (leftAllowed) {
for (VectorWrapper<?> w : leftIterator) {
@@ -423,6 +422,8 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
}
}
+ } else {
+ container.zeroVectors();
}
for (VectorWrapper w : container) {
AllocationHelper.allocateNew(w.getValueVector(), Character.MAX_VALUE);
@@ -477,9 +478,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
private LogicalExpression materializeExpression(LogicalExpression expression, IterOutcome lastStatus,
VectorAccessible input, ErrorCollector collector) throws ClassTransformationException {
- LogicalExpression materializedExpr = null;
+ LogicalExpression materializedExpr;
if (lastStatus != IterOutcome.NONE) {
- materializedExpr = ExpressionTreeMaterializer.materialize(expression, input, collector, context.getFunctionRegistry());
+ materializedExpr = ExpressionTreeMaterializer.materialize(expression, input, collector, context.getFunctionRegistry(), unionTypeEnabled);
} else {
materializedExpr = new TypedNullConstant(Types.optional(MinorType.INT));
}
http://git-wip-us.apache.org/repos/asf/drill/blob/cc9175c1/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 815e2d8..ccc05ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -29,7 +29,6 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
import org.apache.drill.exec.vector.ValueVector;
@@ -282,8 +281,8 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
if (fieldIds.length == 1 && clazz != null && !clazz.isAssignableFrom(va.getVectorClass())) {
throw new IllegalStateException(String.format(
- "Failure while reading vector. Expected vector class of %s but was holding vector class %s.",
- clazz.getCanonicalName(), va.getVectorClass().getCanonicalName()));
+ "Failure while reading vector. Expected vector class of %s but was holding vector class %s, field= %s ",
+ clazz.getCanonicalName(), va.getVectorClass().getCanonicalName(), va.getField()));
}
return va.getChildWrapper(fieldIds);
http://git-wip-us.apache.org/repos/asf/drill/blob/cc9175c1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinWithSchemaChanges.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinWithSchemaChanges.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinWithSchemaChanges.java
new file mode 100644
index 0000000..08aae60
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinWithSchemaChanges.java
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.TestBuilder;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+public class TestMergeJoinWithSchemaChanges extends BaseTestQuery {
+
+ @Test
+ //@Ignore
+ public void testNumericTypes() throws Exception {
+ final File left_dir = new File(BaseTestQuery.getTempDir("mergejoin-schemachanges-left"));
+ final File right_dir = new File(BaseTestQuery.getTempDir("mergejoin-schemachanges-right"));
+ left_dir.mkdirs();
+ right_dir.mkdirs();
+
+ // First create data for numeric types.
+ // left side int and float vs right side float
+ BufferedWriter writer = new BufferedWriter(new FileWriter(new File(left_dir, "l1.json")));
+ for (int i = 0; i < 5000; ++i) {
+ writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+ }
+ writer.close();
+ writer = new BufferedWriter(new FileWriter(new File(left_dir, "l2.json")));
+ for (int i = 1000; i < 6000; ++i) {
+ writer.write(String.format("{ \"kl\" : %f , \"vl\": %f }\n", (float) i, (float) i));
+ }
+ writer.close();
+
+ // right side is int and float
+ writer = new BufferedWriter(new FileWriter(new File(right_dir, "r1.json")));
+ for (int i = 2000; i < 7000; ++i) {
+ writer.write(String.format("{ \"kr\" : %d , \"vr\": %d }\n", i, i));
+ }
+ writer.close();
+ writer = new BufferedWriter(new FileWriter(new File(right_dir, "r2.json")));
+ for (int i = 3000; i < 8000; ++i) {
+ writer.write(String.format("{ \"kr\" : %f, \"vr\": %f }\n", (float) i, (float) i));
+ }
+ writer.close();
+
+ // INNER JOIN
+ String query = String.format("select * from dfs_test.`%s` L %s join dfs_test.`%s` R on L.kl=R.kr",
+ left_dir.toPath().toString(), "inner", right_dir.toPath().toString());
+
+ TestBuilder builder = testBuilder()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
+ .unOrdered()
+ .baselineColumns("kl", "vl", "kr", "vr");
+
+
+ for (long i = 2000; i < 3000; ++i) {
+ builder.baselineValues(i, i, i, i);
+ builder.baselineValues((double)i, (double)i, i, i);
+ }
+ for (long i = 3000; i < 5000; ++i) {
+ builder.baselineValues(i, i, i, i);
+ builder.baselineValues(i, i, (double)i, (double)i);
+ builder.baselineValues((double)i, (double)i, i, i);
+ builder.baselineValues((double)i, (double)i, (double)i, (double)i);
+ }
+ for (long i = 5000; i < 6000; ++i) {
+ builder.baselineValues((double)i, (double)i, i, i);
+ builder.baselineValues((double) i, (double) i, (double) i, (double) i);
+ }
+ builder.go();
+
+ // LEFT JOIN
+ query = String.format("select * from dfs_test.`%s` L %s join dfs_test.`%s` R on L.kl=R.kr",
+ left_dir.toPath().toString(), "left", right_dir.toPath().toString());
+
+ builder = testBuilder()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
+ .unOrdered()
+ .baselineColumns("kl", "vl", "kr", "vr");
+
+ for (long i = 0; i < 2000; ++i) {
+ builder.baselineValues(i, i, null, null);
+ }
+ for (long i = 1000; i < 2000; ++i) {
+ builder.baselineValues((double)i, (double)i, null, null);
+ }
+ for (long i = 2000; i < 3000; ++i) {
+ builder.baselineValues(i, i, i, i);
+ builder.baselineValues((double)i, (double)i, i, i);
+ }
+ for (long i = 3000; i < 5000; ++i) {
+ builder.baselineValues(i, i, i, i);
+ builder.baselineValues(i, i, (double)i, (double)i);
+ builder.baselineValues((double)i, (double)i, i, i);
+ builder.baselineValues((double)i, (double)i, (double)i, (double)i);
+ }
+ for (long i = 5000; i < 6000; ++i) {
+ builder.baselineValues((double) i, (double)i, i, i);
+ builder.baselineValues((double)i, (double)i, (double)i, (double)i);
+ }
+ builder.go();
+ }
+
+ @Test
+ //@Ignore
+ public void testNumericStringTypes() throws Exception {
+ final File left_dir = new File(BaseTestQuery.getTempDir("mergejoin-schemachanges-left"));
+ final File right_dir = new File(BaseTestQuery.getTempDir("mergejoin-schemachanges-right"));
+ left_dir.mkdirs();
+ right_dir.mkdirs();
+
+ // left side int and strings
+ BufferedWriter writer = new BufferedWriter(new FileWriter(new File(left_dir, "l1.json")));
+ for (int i = 0; i < 5000; ++i) {
+ writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+ }
+ writer.close();
+ writer = new BufferedWriter(new FileWriter(new File(left_dir, "l2.json")));
+ for (int i = 1000; i < 6000; ++i) {
+ writer.write(String.format("{ \"kl\" : \"%s\" , \"vl\": \"%s\" }\n", i, i));
+ }
+ writer.close();
+
+ // right side is float and strings
+ writer = new BufferedWriter(new FileWriter(new File(right_dir, "r1.json")));
+ for (int i = 2000; i < 7000; ++i) {
+ writer.write(String.format("{ \"kr\" : %f , \"vr\": %f }\n", (float)i, (float)i));
+ }
+ writer.close();
+ writer = new BufferedWriter(new FileWriter(new File(right_dir, "r2.json")));
+ for (int i = 3000; i < 8000; ++i) {
+ writer.write(String.format("{ \"kr\" : \"%s\", \"vr\": \"%s\" }\n", i, i));
+ }
+ writer.close();
+
+ // INNER JOIN
+ String query = String.format("select * from dfs_test.`%s` L %s join dfs_test.`%s` R on L.kl=R.kr",
+ left_dir.toPath().toString(), "inner", right_dir.toPath().toString());
+
+ TestBuilder builder = testBuilder()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
+ .unOrdered()
+ .baselineColumns("kl", "vl", "kr", "vr");
+
+ for (long i = 2000; i < 5000; ++i) {
+ builder.baselineValues(i, i, (double)i, (double)i);
+ }
+ for (long i = 3000; i < 6000; ++i) {
+ final String d = Long.toString(i);
+ builder.baselineValues(d, d, d, d);
+ }
+ builder.go();
+
+ // RIGHT JOIN
+ query = String.format("select * from dfs_test.`%s` L %s join dfs_test.`%s` R on L.kl=R.kr",
+ left_dir.toPath().toString(), "right", right_dir.toPath().toString());
+
+ builder = testBuilder()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
+ .unOrdered()
+ .baselineColumns("kl", "vl", "kr", "vr");
+
+ for (long i = 2000; i < 5000; ++i) {
+ builder.baselineValues(i, i, (double)i, (double)i);
+ }
+ for (long i = 3000; i < 6000; ++i) {
+ final String d = Long.toString(i);
+ builder.baselineValues(d, d, d, d);
+ }
+ for (long i = 5000; i < 7000; ++i) {
+ builder.baselineValues(null, null, (double)i, (double)i);
+ }
+ for (long i = 6000; i < 8000; ++i) {
+ final String d = Long.toString(i);
+ builder.baselineValues(null, null, d, d);
+ }
+ builder.go();
+ }
+
+ @Test
+ //@Ignore
+ public void testMissingAndNewColumns() throws Exception {
+ final File left_dir = new File(BaseTestQuery.getTempDir("mergejoin-schemachanges-left"));
+ final File right_dir = new File(BaseTestQuery.getTempDir("mergejoin-schemachanges-right"));
+ left_dir.mkdirs();
+ right_dir.mkdirs();
+ System.out.println(left_dir);
+ System.out.println(right_dir);
+
+ // missing column kl
+ BufferedWriter writer = new BufferedWriter(new FileWriter(new File(left_dir, "l1.json")));
+ for (int i = 0; i < 50; ++i) {
+ writer.write(String.format("{ \"kl1\" : %d , \"vl1\": %d }\n", i, i));
+ }
+ writer.close();
+
+ writer = new BufferedWriter(new FileWriter(new File(left_dir, "l2.json")));
+ for (int i = 50; i < 100; ++i) {
+ writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+ }
+ writer.close();
+
+ writer = new BufferedWriter(new FileWriter(new File(left_dir, "l3.json")));
+ for (int i = 100; i < 150; ++i) {
+ writer.write(String.format("{ \"kl2\" : %d , \"vl2\": %d }\n", i, i));
+ }
+ writer.close();
+
+ // right missing column kr
+ writer = new BufferedWriter(new FileWriter(new File(right_dir, "r1.json")));
+ for (int i = 0; i < 50; ++i) {
+ writer.write(String.format("{ \"kr1\" : %f , \"vr1\": %f }\n", (float)i, (float)i));
+ }
+ writer.close();
+
+ writer = new BufferedWriter(new FileWriter(new File(right_dir, "r2.json")));
+ for (int i = 50; i < 100; ++i) {
+ writer.write(String.format("{ \"kr\" : %f , \"vr\": %f }\n", (float)i, (float)i));
+ }
+ writer.close();
+
+ writer = new BufferedWriter(new FileWriter(new File(right_dir, "r3.json")));
+ for (int i = 100; i < 150; ++i) {
+ writer.write(String.format("{ \"kr2\" : %f , \"vr2\": %f }\n", (float)i, (float)i));
+ }
+ writer.close();
+
+ // INNER JOIN
+ String query = String.format("select * from dfs_test.`%s` L %s join dfs_test.`%s` R on L.kl=R.kr",
+ left_dir.toPath().toString(), "inner", right_dir.toPath().toString());
+
+ TestBuilder builder = testBuilder()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
+ .unOrdered()
+ .baselineColumns("kl", "vl", "kr", "vr", "kl1", "vl1", "kl2", "vl2", "kr1", "vr1", "kr2", "vr2");
+
+ for (long i = 50; i < 100; ++i) {
+ builder.baselineValues(i, i, (double)i, (double)i, null, null, null, null, null, null, null, null);
+ }
+ builder.go();
+
+ // LEFT JOIN
+ query = String.format("select * from dfs_test.`%s` L %s join dfs_test.`%s` R on L.kl=R.kr",
+ left_dir.toPath().toString(), "left", right_dir.toPath().toString());
+
+ builder = testBuilder()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
+ .unOrdered()
+ .baselineColumns("kl", "vl", "kr", "vr", "kl1", "vl1", "kl2", "vl2", "kr1", "vr1", "kr2", "vr2");
+
+ for (long i = 0; i < 50; ++i) {
+ builder.baselineValues(null, null, null, null, i, i, null, null, null, null, null, null);
+ }
+ for (long i = 50; i < 100; ++i) {
+ builder.baselineValues(i, i, (double)i, (double)i, null, null, null, null, null, null, null, null);
+ }
+ for (long i = 100; i < 150; ++i) {
+ builder.baselineValues(null, null, null, null, null, null, i, i, null, null, null, null);
+ }
+ builder.go();
+
+ // RIGHT JOIN
+ query = String.format("select * from dfs_test.`%s` L %s join dfs_test.`%s` R on L.kl=R.kr",
+ left_dir.toPath().toString(), "right", right_dir.toPath().toString());
+
+ builder = testBuilder()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
+ .unOrdered()
+ .baselineColumns("kl", "vl", "kr", "vr", "kl1", "vl1", "kl2", "vl2", "kr1", "vr1", "kr2", "vr2");
+
+ for (long i = 0; i < 50; ++i) {
+ builder.baselineValues(null, null, null, null, null, null, null, null, (double)i, (double)i, null, null);
+ }
+ for (long i = 50; i < 100; ++i) {
+ builder.baselineValues(i, i, (double)i, (double)i, null, null, null, null, null, null, null, null);
+ }
+ for (long i = 100; i < 150; ++i) {
+ builder.baselineValues(null, null, null, null, null, null, null, null, null, null, (double)i, (double)i);
+ }
+ builder.go();
+ }
+
+ @Test
+ //@Ignore
+ public void testOneSideSchemaChanges() throws Exception {
+ final File left_dir = new File(BaseTestQuery.getTempDir("mergejoin-schemachanges-left"));
+ final File right_dir = new File(BaseTestQuery.getTempDir("mergejoin-schemachanges-right"));
+ left_dir.mkdirs();
+ right_dir.mkdirs();
+ System.out.println(left_dir);
+ System.out.println(right_dir);
+
+ BufferedWriter writer = new BufferedWriter(new FileWriter(new File(left_dir, "l1.json")));
+ for (int i = 0; i < 50; ++i) {
+ writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+ }
+ for (int i = 50; i < 100; ++i) {
+ writer.write(String.format("{ \"kl\" : %f , \"vl\": %f }\n", (float) i, (float) i));
+ }
+ writer.close();
+
+ writer = new BufferedWriter(new FileWriter(new File(right_dir, "r1.json")));
+ for (int i = 0; i < 50; ++i) {
+ writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+ }
+ writer.close();
+
+ String query = String.format("select * from dfs_test.`%s` L %s join dfs_test.`%s` R on L.kl=R.kl",
+ left_dir.toPath().toString(), "inner", right_dir.toPath().toString());
+ TestBuilder builder = testBuilder()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
+ .unOrdered()
+ .baselineColumns("kl", "vl", "kl0", "vl0");
+
+ for (long i = 0; i < 50; ++i) {
+ builder.baselineValues(i, i, i, i);
+ }
+ builder.go();
+ }
+}
[2/2] drill git commit: DRILL-4182 TopN schema changes support.
Posted by sm...@apache.org.
DRILL-4182 TopN schema changes support.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e529df46
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e529df46
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e529df46
Branch: refs/heads/master
Commit: e529df46054b685d72ff424c58e38a5dcdbc381a
Parents: cc9175c
Author: Amit Hadke <am...@gmail.com>
Authored: Thu Dec 10 00:21:52 2015 -0800
Committer: Steven Phillips <sm...@apache.org>
Committed: Sun Dec 13 23:23:22 2015 -0800
----------------------------------------------------------------------
.../impl/TopN/PriorityQueueTemplate.java | 16 +-
.../exec/physical/impl/TopN/TopNBatch.java | 75 ++++++-
.../impl/svremover/RemovingRecordBatch.java | 2 +-
.../drill/exec/record/HyperVectorWrapper.java | 3 +-
.../apache/drill/exec/record/SchemaUtil.java | 104 +++++----
.../drill/exec/record/VectorContainer.java | 6 +-
.../apache/drill/exec/util/BatchPrinter.java | 20 +-
.../impl/TopN/TestTopNSchemaChanges.java | 211 +++++++++++++++++++
8 files changed, 384 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
index 5cdfc5d..2b1830e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
@@ -70,12 +70,17 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
newContainer.add(container.getValueAccessorById(field.getValueClass(), ids).getValueVectors());
}
newContainer.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
+ // Cleanup before recreating hyperbatch and sv4.
+ cleanup();
hyperBatch = new ExpandableHyperContainer(newContainer);
batchCount = hyperBatch.iterator().next().getValueVectors().length;
final DrillBuf drillBuf = allocator.buffer(4 * (limit + 1));
heapSv4 = new SelectionVector4(drillBuf, limit, Character.MAX_VALUE);
+ // Reset queue size (most likely to be set to limit).
+ queueSize = 0;
for (int i = 0; i < v4.getTotalCount(); i++) {
heapSv4.set(i, v4.get(i));
+ ++queueSize;
}
v4.clear();
doSetup(context, hyperBatch, null);
@@ -146,8 +151,15 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
@Override
public void cleanup() {
- heapSv4.clear();
- hyperBatch.clear();
+ if (heapSv4 != null) {
+ heapSv4.clear();
+ }
+ if (hyperBatch != null) {
+ hyperBatch.clear();
+ }
+ if (finalSv4 != null) {
+ finalSv4.clear();
+ }
}
private void siftUp() {
http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 54d2839..8c4cf21 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -49,6 +49,7 @@ import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.SchemaUtil;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
@@ -76,6 +77,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
private final RecordBatch incoming;
private BatchSchema schema;
+ private boolean schemaChanged = false;
private PriorityQueue priorityQueue;
private TopN config;
SelectionVector4 sv4;
@@ -143,6 +145,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
}
container.buildSchema(SelectionVectorMode.NONE);
container.setRecordCount(0);
+
return;
case STOP:
state = BatchState.STOP;
@@ -202,9 +205,16 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
// only change in the case that the schema truly changes. Artificial schema changes are ignored.
if (!incoming.getSchema().equals(schema)) {
if (schema != null) {
- throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+ if (!unionTypeEnabled) {
+ throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+ } else {
+ this.schema = SchemaUtil.mergeSchemas(this.schema, incoming.getSchema());
+ purgeAndResetPriorityQueue();
+ this.schemaChanged = true;
+ }
+ } else {
+ this.schema = incoming.getSchema();
}
- this.schema = incoming.getSchema();
}
// fall through.
case OK:
@@ -216,11 +226,17 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
}
countSincePurge += incoming.getRecordCount();
batchCount++;
- RecordBatchData batch = new RecordBatchData(incoming);
+ RecordBatchData batch;
+ if (schemaChanged) {
+ batch = new RecordBatchData(SchemaUtil.coerceContainer(incoming, this.schema, oContext));
+ } else {
+ batch = new RecordBatchData(incoming);
+ }
boolean success = false;
try {
batch.canonicalize();
if (priorityQueue == null) {
+ assert !schemaChanged;
priorityQueue = createNewPriorityQueue(context, config.getOrderings(), new ExpandableHyperContainer(batch.getContainer()), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
}
priorityQueue.add(context, batch);
@@ -255,7 +271,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
container.add(w.getValueVectors());
}
container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
-
recordCount = sv4.getCount();
return IterOutcome.OK_NEW_SCHEMA;
@@ -323,7 +338,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
for (Ordering od : orderings) {
// first, we rewrite the evaluation stack for each side of the comparison.
ErrorCollector collector = new ErrorCollectorImpl();
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry(), unionTypeEnabled);
if (collector.hasErrors()) {
throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
}
@@ -356,6 +371,56 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
return q;
}
+ /**
+ * Handle schema changes during execution.
+ * 1. Purge existing batches
+ * 2. Promote newly created container for new schema.
+ * 3. Recreate priority queue and reset with coerced container.
+ * @throws SchemaChangeException
+ */
+ public void purgeAndResetPriorityQueue() throws SchemaChangeException, ClassTransformationException, IOException {
+ final Stopwatch watch = new Stopwatch();
+ watch.start();
+ final VectorContainer c = priorityQueue.getHyperBatch();
+ final VectorContainer newContainer = new VectorContainer(oContext);
+ final SelectionVector4 selectionVector4 = priorityQueue.getHeapSv4();
+ final SimpleRecordBatch batch = new SimpleRecordBatch(c, selectionVector4, context);
+ final SimpleRecordBatch newBatch = new SimpleRecordBatch(newContainer, null, context);
+ copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch, null);
+ SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator());
+ try {
+ do {
+ final int count = selectionVector4.getCount();
+ final int copiedRecords = copier.copyRecords(0, count);
+ assert copiedRecords == count;
+ for (VectorWrapper<?> v : newContainer) {
+ ValueVector.Mutator m = v.getValueVector().getMutator();
+ m.setValueCount(count);
+ }
+ newContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ newContainer.setRecordCount(count);
+ builder.add(newBatch);
+ } while (selectionVector4.next());
+ selectionVector4.clear();
+ c.clear();
+ final VectorContainer oldSchemaContainer = new VectorContainer(oContext);
+ builder.canonicalize();
+ builder.build(context, oldSchemaContainer);
+ oldSchemaContainer.setRecordCount(builder.getSv4().getCount());
+ final VectorContainer newSchemaContainer = SchemaUtil.coerceContainer(oldSchemaContainer, this.schema, oContext);
+ // Canonicalize new container since we canonicalize incoming batches before adding to queue.
+ final VectorContainer canonicalizedContainer = VectorContainer.canonicalize(newSchemaContainer);
+ canonicalizedContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);
+ priorityQueue.cleanup();
+ priorityQueue = createNewPriorityQueue(context, config.getOrderings(), canonicalizedContainer, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
+ priorityQueue.resetQueue(canonicalizedContainer, builder.getSv4().createNewWrapperCurrent());
+ } finally {
+ builder.clear();
+ builder.close();
+ }
+ logger.debug("Took {} us to purge and recreate queue for new schema", watch.elapsed(TimeUnit.MICROSECONDS));
+ }
+
@Override
public WritableBatch getWritableBatch() {
throw new UnsupportedOperationException("A sort batch is not writable.");
http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index fa6001b..5faaf58 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -62,7 +62,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
@Override
protected boolean setupNewSchema() throws SchemaChangeException {
- container.zeroVectors();
+ container.clear();
switch(incoming.getSchema().getSelectionVectorMode()){
case NONE:
this.copier = getStraightCopier();
http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
index a1557e6..7fc7960 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
@@ -132,7 +132,8 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<
}
public void addVector(ValueVector v) {
- Preconditions.checkArgument(v.getClass() == this.getVectorClass(), String.format("Cannot add vector type %s to hypervector type %s", v.getClass(), this.getVectorClass()));
+ Preconditions.checkArgument(v.getClass() == this.getVectorClass(), String.format("Cannot add vector type %s to hypervector type %s for field %s",
+ v.getClass(), this.getVectorClass(), v.getField()));
vectors = (T[]) ArrayUtils.add(vectors, v);// TODO optimize this so not copying every time
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
index 8a0954e..8cf90ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
@@ -96,6 +96,49 @@ public class SchemaUtil {
return s;
}
+ private static ValueVector coerceVector(ValueVector v, VectorContainer c, MaterializedField field,
+ int recordCount, OperatorContext context) {
+ if (v != null) {
+ int valueCount = v.getAccessor().getValueCount();
+ TransferPair tp = v.getTransferPair();
+ tp.transfer();
+ if (v.getField().getType().getMinorType().equals(field.getType().getMinorType())) {
+ if (field.getType().getMinorType() == MinorType.UNION) {
+ UnionVector u = (UnionVector) tp.getTo();
+ for (MinorType t : field.getType().getSubTypeList()) {
+ if (u.getField().getType().getSubTypeList().contains(t)) {
+ continue;
+ }
+ u.addSubType(t);
+ }
+ }
+ return tp.getTo();
+ } else {
+ ValueVector newVector = TypeHelper.getNewVector(field, context.getAllocator());
+ Preconditions.checkState(field.getType().getMinorType() == MinorType.UNION, "Can only convert vector to Union vector");
+ UnionVector u = (UnionVector) newVector;
+ u.addVector(tp.getTo());
+ MinorType type = v.getField().getType().getMinorType();
+ for (int i = 0; i < valueCount; i++) {
+ u.getMutator().setType(i, type);
+ }
+ for (MinorType t : field.getType().getSubTypeList()) {
+ if (u.getField().getType().getSubTypeList().contains(t)) {
+ continue;
+ }
+ u.addSubType(t);
+ }
+ u.getMutator().setValueCount(valueCount);
+ return u;
+ }
+ } else {
+ v = TypeHelper.getNewVector(field, context.getAllocator());
+ v.allocateNew();
+ v.getMutator().setValueCount(recordCount);
+ return v;
+ }
+ }
+
/**
* Creates a copy a record batch, converting any fields as necessary to coerce it into the provided schema
* @param in
@@ -105,54 +148,39 @@ public class SchemaUtil {
*/
public static VectorContainer coerceContainer(VectorAccessible in, BatchSchema toSchema, OperatorContext context) {
int recordCount = in.getRecordCount();
- Map<SchemaPath,ValueVector> vectorMap = Maps.newHashMap();
+ boolean isHyper = false;
+ Map<SchemaPath, Object> vectorMap = Maps.newHashMap();
for (VectorWrapper w : in) {
- ValueVector v = w.getValueVector();
- vectorMap.put(v.getField().getPath(), v);
+ if (w.isHyper()) {
+ isHyper = true;
+ final ValueVector[] vvs = w.getValueVectors();
+ vectorMap.put(vvs[0].getField().getPath(), vvs);
+ } else {
+ assert !isHyper;
+ final ValueVector v = w.getValueVector();
+ vectorMap.put(v.getField().getPath(), v);
+ }
}
VectorContainer c = new VectorContainer(context);
for (MaterializedField field : toSchema) {
- ValueVector v = vectorMap.remove(field.getPath());
- if (v != null) {
- int valueCount = v.getAccessor().getValueCount();
- TransferPair tp = v.getTransferPair();
- tp.transfer();
- if (v.getField().getType().getMinorType().equals(field.getType().getMinorType())) {
- if (field.getType().getMinorType() == MinorType.UNION) {
- UnionVector u = (UnionVector) tp.getTo();
- for (MinorType t : field.getType().getSubTypeList()) {
- if (u.getField().getType().getSubTypeList().contains(t)) {
- continue;
- }
- u.addSubType(t);
- }
- }
- c.add(tp.getTo());
+ if (isHyper) {
+ final ValueVector[] vvs = (ValueVector[]) vectorMap.remove(field.getPath());
+ final ValueVector[] vvsOut;
+ if (vvs == null) {
+ vvsOut = new ValueVector[1];
+ vvsOut[0] = coerceVector(null, c, field, recordCount, context);
} else {
- ValueVector newVector = TypeHelper.getNewVector(field, context.getAllocator());
- Preconditions.checkState(field.getType().getMinorType() == MinorType.UNION, "Can only convert vector to Union vector");
- UnionVector u = (UnionVector) newVector;
- u.addVector(tp.getTo());
- MinorType type = v.getField().getType().getMinorType();
- for (int i = 0; i < valueCount; i++) {
- u.getMutator().setType(i, type);
- }
- for (MinorType t : field.getType().getSubTypeList()) {
- if (u.getField().getType().getSubTypeList().contains(t)) {
- continue;
- }
- u.addSubType(t);
+ vvsOut = new ValueVector[vvs.length];
+ for (int i = 0; i < vvs.length; ++i) {
+ vvsOut[i] = coerceVector(vvs[i], c, field, recordCount, context);
}
- u.getMutator().setValueCount(valueCount);
- c.add(u);
}
+ c.add(vvsOut);
} else {
- v = TypeHelper.getNewVector(field, context.getAllocator());
- v.allocateNew();
- v.getMutator().setValueCount(recordCount);
- c.add(v);
+ final ValueVector v = (ValueVector) vectorMap.remove(field.getPath());
+ c.add(coerceVector(v, c, field, recordCount, context));
}
}
c.buildSchema(in.getSchema().getSelectionVectorMode());
http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index ccc05ff..c483650 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -187,7 +187,11 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
});
for (VectorWrapper<?> w : canonicalWrappers) {
- vc.add(w.getValueVector());
+ if (w.isHyper()) {
+ vc.add(w.getValueVectors());
+ } else {
+ vc.add(w.getValueVector());
+ }
}
vc.oContext = original.oContext;
return vc;
http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
index 198c0b5..2a1db01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
@@ -42,15 +42,25 @@ public class BatchPrinter {
}
int width = columns.size();
for (int j = 0; j < sv4.getCount(); j++) {
+ if (j%50 == 0) {
+ System.out.println(StringUtils.repeat("-", width * 17 + 1));
+ for (String column : columns) {
+ System.out.printf("| %-15s", width <= 15 ? column : column.substring(0, 14));
+ }
+ System.out.printf("|\n");
+ System.out.println(StringUtils.repeat("-", width*17 + 1));
+ }
for (VectorWrapper vw : batch) {
Object o = vw.getValueVectors()[sv4.get(j) >>> 16].getAccessor().getObject(sv4.get(j) & 65535);
- if (o instanceof byte[]) {
- String value = new String((byte[]) o);
- System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14));
+ String value;
+ if (o == null) {
+ value = "null";
+ } else if (o instanceof byte[]) {
+ value = new String((byte[]) o);
} else {
- String value = o.toString();
- System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14));
+ value = o.toString();
}
+ System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14));
}
System.out.printf("|\n");
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
new file mode 100644
index 0000000..0f65bab
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.TopN;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.TestBuilder;
+import org.apache.drill.exec.physical.impl.aggregate.InternalBatch;
+import org.junit.Test;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+
+public class TestTopNSchemaChanges extends BaseTestQuery {
+
+ @Test
+ public void testNumericTypes() throws Exception {
+ final File data_dir = new File(BaseTestQuery.getTempDir("topn-schemachanges"));
+ data_dir.mkdirs();
+
+ // left side int and strings
+ BufferedWriter writer = new BufferedWriter(new FileWriter(new File(data_dir, "d1.json")));
+ for (int i = 0; i < 10000; i+=2) {
+ writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+ }
+ writer.close();
+ writer = new BufferedWriter(new FileWriter(new File(data_dir, "d2.json")));
+ for (int i = 1; i < 10000; i+=2) {
+ writer.write(String.format("{ \"kl\" : %f , \"vl\": %f }\n", (float)i, (float)i));
+ }
+ writer.close();
+ String query = String.format("select * from dfs_test.`%s` order by kl limit 12", data_dir.toPath().toString());
+
+ TestBuilder builder = testBuilder()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+ .ordered()
+ .baselineColumns("kl", "vl");
+
+ for (long i = 0; i< 12 ; ++i) {
+ if (i %2 == 0) {
+ builder.baselineValues(i, i);
+ } else {
+ builder.baselineValues((double)i, (double)i);
+ }
+ }
+ builder.go();
+ }
+
+ @Test
+ public void testNumericAndStringTypes() throws Exception {
+ final File data_dir = new File(BaseTestQuery.getTempDir("topn-schemachanges"));
+ data_dir.mkdirs();
+
+ // left side int and strings
+ BufferedWriter writer = new BufferedWriter(new FileWriter(new File(data_dir, "d1.json")));
+ for (int i = 0; i < 1000; i+=2) {
+ writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+ }
+ writer.close();
+ writer = new BufferedWriter(new FileWriter(new File(data_dir, "d2.json")));
+ for (int i = 1; i < 1000; i+=2) {
+ writer.write(String.format("{ \"kl\" : \"%s\" , \"vl\": \"%s\" }\n", i, i));
+ }
+ writer.close();
+ String query = String.format("select * from dfs_test.`%s` order by kl limit 12", data_dir.toPath().toString());
+
+ TestBuilder builder = testBuilder()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+ .ordered()
+ .baselineColumns("kl", "vl");
+
+ for (long i = 0; i< 24 ; i+=2) {
+ builder.baselineValues(i, i);
+ }
+
+ query = String.format("select * from dfs_test.`%s` order by kl desc limit 12", data_dir.toPath().toString());
+ builder = testBuilder()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+ .ordered()
+ .baselineColumns("kl", "vl")
+ .baselineValues("999", "999")
+ .baselineValues("997", "997")
+ .baselineValues("995", "995")
+ .baselineValues("993", "993")
+ .baselineValues("991", "991")
+ .baselineValues("99", "99")
+ .baselineValues("989", "989")
+ .baselineValues("987", "987")
+ .baselineValues("985", "985")
+ .baselineValues("983", "983")
+ .baselineValues("981", "981")
+ .baselineValues("979", "979");
+ builder.go();
+ }
+
+ @Test
+ public void testUnionTypes() throws Exception {
+ final File data_dir = new File(BaseTestQuery.getTempDir("topn-schemachanges"));
+ data_dir.mkdirs();
+
+ // union of int and float and string.
+ BufferedWriter writer = new BufferedWriter(new FileWriter(new File(data_dir, "d1.json")));
+ for (int i = 0; i <= 9; ++i) {
+ switch (i%3) {
+ case 0: // 0, 3, 6, 9
+ writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+ break;
+ case 1: // 1, 4, 7
+ writer.write(String.format("{ \"kl\" : %f , \"vl\": %f }\n", (float)i, (float)i));
+ break;
+ case 2: // 2, 5, 8
+ writer.write(String.format("{ \"kl\" : \"%s\" , \"vl\": \"%s\" }\n", i, i));
+ break;
+ }
+ }
+ writer.close();
+ String query = String.format("select * from dfs_test.`%s` order by kl limit 8", data_dir.toPath().toString());
+
+ TestBuilder builder = testBuilder()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+ .ordered()
+ .baselineColumns("kl", "vl");
+
+ builder.baselineValues(0l, 0l);
+ builder.baselineValues(1.0d, 1.0d);
+ builder.baselineValues(3l, 3l);
+ builder.baselineValues(4.0d, 4.0d);
+ builder.baselineValues(6l, 6l);
+ builder.baselineValues(7.0d, 7.0d);
+ builder.baselineValues(9l, 9l);
+ builder.baselineValues("2", "2");
+ builder.go();
+ }
+
+ @Test
+ public void testMissingColumn() throws Exception {
+ final File data_dir = new File(BaseTestQuery.getTempDir("topn-schemachanges"));
+ data_dir.mkdirs();
+ System.out.println(data_dir);
+ BufferedWriter writer = new BufferedWriter(new FileWriter(new File(data_dir, "d1.json")));
+ for (int i = 0; i < 100; i++) {
+ writer.write(String.format("{ \"kl1\" : %d , \"vl1\": %d }\n", i, i));
+ }
+ writer.close();
+ writer = new BufferedWriter(new FileWriter(new File(data_dir, "d2.json")));
+ for (int i = 100; i < 200; i++) {
+ writer.write(String.format("{ \"kl\" : %f , \"vl\": %f }\n", (float)i, (float)i));
+ }
+ writer.close();
+ writer = new BufferedWriter(new FileWriter(new File(data_dir, "d3.json")));
+ for (int i = 200; i < 300; i++) {
+ writer.write(String.format("{ \"kl2\" : \"%s\" , \"vl2\": \"%s\" }\n", i, i));
+ }
+ writer.close();
+
+ String query = String.format("select kl, vl, kl1, vl1, kl2, vl2 from dfs_test.`%s` order by kl limit 3", data_dir.toPath().toString());
+ TestBuilder builder = testBuilder()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+ .ordered()
+ .baselineColumns("kl", "vl", "kl1", "vl1", "kl2", "vl2")
+ .baselineValues(100.0d, 100.0d, null, null, null, null)
+ .baselineValues(101.0d, 101.0d, null, null, null, null)
+ .baselineValues(102.0d, 102.0d, null, null, null, null);
+ builder.go();
+
+ query = String.format("select kl, vl, kl1, vl1, kl2, vl2 from dfs_test.`%s` order by kl1 limit 3", data_dir.toPath().toString());
+ builder = testBuilder()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+ .ordered()
+ .baselineColumns("kl", "vl", "kl1", "vl1", "kl2", "vl2")
+ .baselineValues(null, null, 0l, 0l, null, null)
+ .baselineValues(null, null, 1l, 1l, null, null)
+ .baselineValues(null, null, 2l, 2l, null, null);
+ builder.go();
+
+ query = String.format("select kl, vl, kl1, vl1, kl2, vl2 from dfs_test.`%s` order by kl2 desc limit 3", data_dir.toPath().toString());
+ builder = testBuilder()
+ .sqlQuery(query)
+ .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+ .ordered()
+ .baselineColumns("kl", "vl", "kl1", "vl1", "kl2", "vl2")
+ .baselineValues(null, null, null, null, "299", "299")
+ .baselineValues(null, null, null, null, "298", "298")
+ .baselineValues(null, null, null, null, "297", "297");
+ builder.go();
+ // Since client can't handle new columns which are not in first batch, we won't test output of query.
+ // Query should run w/o any errors.
+ test(String.format("select * from dfs_test.`%s` order by kl limit 3", data_dir.toPath().toString()));
+ }
+}