You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/12/14 08:27:57 UTC

[1/2] drill git commit: DRILL-4163 Schema changes support in MergeJoin Operator.

Repository: drill
Updated Branches:
  refs/heads/master bb3fc1521 -> e529df460


DRILL-4163 Schema changes support in MergeJoin Operator.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/cc9175c1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/cc9175c1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/cc9175c1

Branch: refs/heads/master
Commit: cc9175c13270660ffd9ec2ddcbc70780dd72dada
Parents: bb3fc15
Author: Amit Hadke <am...@gmail.com>
Authored: Fri Dec 4 16:38:36 2015 -0800
Committer: Steven Phillips <sm...@apache.org>
Committed: Sun Dec 13 23:22:55 2015 -0800

----------------------------------------------------------------------
 .../exec/physical/impl/join/JoinTemplate.java   |   2 +-
 .../exec/physical/impl/join/JoinUtils.java      |   3 +
 .../exec/physical/impl/join/MergeJoinBatch.java |  11 +-
 .../drill/exec/record/VectorContainer.java      |   5 +-
 .../join/TestMergeJoinWithSchemaChanges.java    | 348 +++++++++++++++++++
 5 files changed, 360 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/cc9175c1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index 40c47b3..43cbf71 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -60,7 +60,7 @@ public abstract class JoinTemplate implements JoinWorker {
       if (status.left.finished()) {
         return true;
       }
-      final int comparison = doCompare(status.left.getCurrentPosition(), status.right.getCurrentPosition());
+      final int comparison = Integer.signum(doCompare(status.left.getCurrentPosition(), status.right.getCurrentPosition()));
       switch (comparison) {
         case -1:
           // left key < right key

http://git-wip-us.apache.org/repos/asf/drill/blob/cc9175c1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
index 2476a83..61640bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
@@ -174,6 +174,9 @@ public class JoinUtils {
       TypeProtos.MinorType rightType = rightExpression.getMajorType().getMinorType();
       TypeProtos.MinorType leftType = leftExpression.getMajorType().getMinorType();
 
+      if (rightType == TypeProtos.MinorType.UNION || leftType == TypeProtos.MinorType.UNION) {
+        continue;
+      }
       if (rightType != leftType) {
 
         // currently we only support implicit casts if the input types are numeric or varchar/varbinary

http://git-wip-us.apache.org/repos/asf/drill/blob/cc9175c1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index edafbfc..9ef5cde 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -102,6 +102,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
   private JoinWorker worker;
   private boolean areNullsEqual = false; // whether nulls compare equal
 
+
   private static final String LEFT_INPUT = "LEFT INPUT";
   private static final String RIGHT_INPUT = "RIGHT INPUT";
 
@@ -381,13 +382,11 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
   }
 
   private void allocateBatch(boolean newSchema) {
-    // allocate new batch space.
-    container.zeroVectors();
-
     boolean leftAllowed = status.getLeftStatus() != IterOutcome.NONE;
     boolean rightAllowed = status.getRightStatus() != IterOutcome.NONE;
 
     if (newSchema) {
+      container.clear();
       // add fields from both batches
       if (leftAllowed) {
         for (VectorWrapper<?> w : leftIterator) {
@@ -423,6 +422,8 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
           }
         }
       }
+    } else {
+      container.zeroVectors();
     }
     for (VectorWrapper w : container) {
       AllocationHelper.allocateNew(w.getValueVector(), Character.MAX_VALUE);
@@ -477,9 +478,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
 
   private LogicalExpression materializeExpression(LogicalExpression expression, IterOutcome lastStatus,
                                                   VectorAccessible input, ErrorCollector collector) throws ClassTransformationException {
-    LogicalExpression materializedExpr = null;
+    LogicalExpression materializedExpr;
     if (lastStatus != IterOutcome.NONE) {
-      materializedExpr = ExpressionTreeMaterializer.materialize(expression, input, collector, context.getFunctionRegistry());
+      materializedExpr = ExpressionTreeMaterializer.materialize(expression, input, collector, context.getFunctionRegistry(), unionTypeEnabled);
     } else {
       materializedExpr = new TypedNullConstant(Types.optional(MinorType.INT));
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/cc9175c1/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 815e2d8..ccc05ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -29,7 +29,6 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
@@ -282,8 +281,8 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
 
     if (fieldIds.length == 1 && clazz != null && !clazz.isAssignableFrom(va.getVectorClass())) {
       throw new IllegalStateException(String.format(
-          "Failure while reading vector.  Expected vector class of %s but was holding vector class %s.",
-          clazz.getCanonicalName(), va.getVectorClass().getCanonicalName()));
+          "Failure while reading vector.  Expected vector class of %s but was holding vector class %s, field= %s ",
+          clazz.getCanonicalName(), va.getVectorClass().getCanonicalName(), va.getField()));
     }
 
     return va.getChildWrapper(fieldIds);

http://git-wip-us.apache.org/repos/asf/drill/blob/cc9175c1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinWithSchemaChanges.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinWithSchemaChanges.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinWithSchemaChanges.java
new file mode 100644
index 0000000..08aae60
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinWithSchemaChanges.java
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.TestBuilder;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+public class TestMergeJoinWithSchemaChanges extends BaseTestQuery {
+
+  @Test
+  //@Ignore
+  public void testNumericTypes() throws Exception {
+    final File left_dir = new File(BaseTestQuery.getTempDir("mergejoin-schemachanges-left"));
+    final File right_dir = new File(BaseTestQuery.getTempDir("mergejoin-schemachanges-right"));
+    left_dir.mkdirs();
+    right_dir.mkdirs();
+
+    // First create data for numeric types.
+    // left side int and float vs right side float
+    BufferedWriter writer = new BufferedWriter(new FileWriter(new File(left_dir, "l1.json")));
+    for (int i = 0; i < 5000; ++i) {
+      writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+    }
+    writer.close();
+    writer = new BufferedWriter(new FileWriter(new File(left_dir, "l2.json")));
+    for (int i = 1000; i < 6000; ++i) {
+      writer.write(String.format("{ \"kl\" : %f , \"vl\": %f }\n", (float) i, (float) i));
+    }
+    writer.close();
+
+    // right side is int and float
+    writer = new BufferedWriter(new FileWriter(new File(right_dir, "r1.json")));
+    for (int i = 2000; i < 7000; ++i) {
+      writer.write(String.format("{ \"kr\" : %d , \"vr\": %d }\n", i, i));
+    }
+    writer.close();
+    writer = new BufferedWriter(new FileWriter(new File(right_dir, "r2.json")));
+    for (int i = 3000; i < 8000; ++i) {
+      writer.write(String.format("{ \"kr\" : %f, \"vr\": %f }\n", (float) i, (float) i));
+    }
+    writer.close();
+
+    // INNER JOIN
+    String query = String.format("select * from dfs_test.`%s` L %s join dfs_test.`%s` R on L.kl=R.kr",
+      left_dir.toPath().toString(), "inner", right_dir.toPath().toString());
+
+    TestBuilder builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
+      .unOrdered()
+      .baselineColumns("kl", "vl", "kr", "vr");
+
+
+    for (long i = 2000; i < 3000; ++i) {
+      builder.baselineValues(i, i, i, i);
+      builder.baselineValues((double)i, (double)i, i, i);
+    }
+    for (long i = 3000; i < 5000; ++i) {
+      builder.baselineValues(i, i, i, i);
+      builder.baselineValues(i, i, (double)i, (double)i);
+      builder.baselineValues((double)i, (double)i, i, i);
+      builder.baselineValues((double)i, (double)i, (double)i, (double)i);
+    }
+    for (long i = 5000; i < 6000; ++i) {
+      builder.baselineValues((double)i, (double)i, i, i);
+      builder.baselineValues((double) i, (double) i, (double) i, (double) i);
+    }
+    builder.go();
+
+    // LEFT JOIN
+    query = String.format("select * from dfs_test.`%s` L %s join dfs_test.`%s` R on L.kl=R.kr",
+      left_dir.toPath().toString(), "left", right_dir.toPath().toString());
+
+    builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
+      .unOrdered()
+      .baselineColumns("kl", "vl", "kr", "vr");
+
+    for (long i = 0; i < 2000; ++i)   {
+      builder.baselineValues(i, i, null, null);
+    }
+    for (long i = 1000; i < 2000; ++i) {
+      builder.baselineValues((double)i, (double)i, null, null);
+    }
+    for (long i = 2000; i < 3000; ++i) {
+      builder.baselineValues(i, i, i, i);
+      builder.baselineValues((double)i, (double)i, i, i);
+    }
+    for (long i = 3000; i < 5000; ++i) {
+      builder.baselineValues(i, i, i, i);
+      builder.baselineValues(i, i, (double)i, (double)i);
+      builder.baselineValues((double)i, (double)i, i, i);
+      builder.baselineValues((double)i, (double)i, (double)i, (double)i);
+    }
+    for (long i = 5000; i < 6000; ++i) {
+      builder.baselineValues((double) i, (double)i, i, i);
+      builder.baselineValues((double)i, (double)i, (double)i, (double)i);
+    }
+    builder.go();
+  }
+
+  @Test
+  //@Ignore
+  public void testNumericStringTypes() throws Exception {
+    final File left_dir = new File(BaseTestQuery.getTempDir("mergejoin-schemachanges-left"));
+    final File right_dir = new File(BaseTestQuery.getTempDir("mergejoin-schemachanges-right"));
+    left_dir.mkdirs();
+    right_dir.mkdirs();
+
+    // left side int and strings
+    BufferedWriter writer = new BufferedWriter(new FileWriter(new File(left_dir, "l1.json")));
+    for (int i = 0; i < 5000; ++i) {
+      writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+    }
+    writer.close();
+    writer = new BufferedWriter(new FileWriter(new File(left_dir, "l2.json")));
+    for (int i = 1000; i < 6000; ++i) {
+      writer.write(String.format("{ \"kl\" : \"%s\" , \"vl\": \"%s\" }\n", i, i));
+    }
+    writer.close();
+
+    // right side is float and strings
+    writer = new BufferedWriter(new FileWriter(new File(right_dir, "r1.json")));
+    for (int i = 2000; i < 7000; ++i) {
+      writer.write(String.format("{ \"kr\" : %f , \"vr\": %f }\n", (float)i, (float)i));
+    }
+    writer.close();
+    writer = new BufferedWriter(new FileWriter(new File(right_dir, "r2.json")));
+    for (int i = 3000; i < 8000; ++i) {
+      writer.write(String.format("{ \"kr\" : \"%s\", \"vr\": \"%s\" }\n", i, i));
+    }
+    writer.close();
+
+    // INNER JOIN
+    String query = String.format("select * from dfs_test.`%s` L %s join dfs_test.`%s` R on L.kl=R.kr",
+      left_dir.toPath().toString(), "inner", right_dir.toPath().toString());
+
+    TestBuilder builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
+      .unOrdered()
+      .baselineColumns("kl", "vl", "kr", "vr");
+
+    for (long i = 2000; i < 5000; ++i) {
+      builder.baselineValues(i, i, (double)i, (double)i);
+    }
+    for (long i = 3000; i < 6000; ++i) {
+      final String d = Long.toString(i);
+      builder.baselineValues(d, d, d, d);
+    }
+    builder.go();
+
+    // RIGHT JOIN
+    query = String.format("select * from dfs_test.`%s` L %s join dfs_test.`%s` R on L.kl=R.kr",
+      left_dir.toPath().toString(), "right", right_dir.toPath().toString());
+
+    builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
+      .unOrdered()
+      .baselineColumns("kl", "vl", "kr", "vr");
+
+    for (long i = 2000; i < 5000; ++i) {
+      builder.baselineValues(i, i, (double)i, (double)i);
+    }
+    for (long i = 3000; i < 6000; ++i) {
+      final String d = Long.toString(i);
+      builder.baselineValues(d, d, d, d);
+    }
+    for (long i = 5000; i < 7000; ++i) {
+      builder.baselineValues(null, null, (double)i, (double)i);
+    }
+    for (long i = 6000; i < 8000; ++i) {
+      final String d = Long.toString(i);
+      builder.baselineValues(null, null, d, d);
+    }
+    builder.go();
+  }
+
+  @Test
+  //@Ignore
+  public void testMissingAndNewColumns() throws Exception {
+    final File left_dir = new File(BaseTestQuery.getTempDir("mergejoin-schemachanges-left"));
+    final File right_dir = new File(BaseTestQuery.getTempDir("mergejoin-schemachanges-right"));
+    left_dir.mkdirs();
+    right_dir.mkdirs();
+    System.out.println(left_dir);
+    System.out.println(right_dir);
+
+    // missing column kl
+    BufferedWriter writer = new BufferedWriter(new FileWriter(new File(left_dir, "l1.json")));
+    for (int i = 0; i < 50; ++i) {
+      writer.write(String.format("{ \"kl1\" : %d , \"vl1\": %d }\n", i, i));
+    }
+    writer.close();
+
+    writer = new BufferedWriter(new FileWriter(new File(left_dir, "l2.json")));
+    for (int i = 50; i < 100; ++i) {
+      writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+    }
+    writer.close();
+
+    writer = new BufferedWriter(new FileWriter(new File(left_dir, "l3.json")));
+    for (int i = 100; i < 150; ++i) {
+      writer.write(String.format("{ \"kl2\" : %d , \"vl2\": %d }\n", i, i));
+    }
+    writer.close();
+
+    // right missing column kr
+    writer = new BufferedWriter(new FileWriter(new File(right_dir, "r1.json")));
+    for (int i = 0; i < 50; ++i) {
+      writer.write(String.format("{ \"kr1\" : %f , \"vr1\": %f }\n", (float)i, (float)i));
+    }
+    writer.close();
+
+    writer = new BufferedWriter(new FileWriter(new File(right_dir, "r2.json")));
+    for (int i = 50; i < 100; ++i) {
+      writer.write(String.format("{ \"kr\" : %f , \"vr\": %f }\n", (float)i, (float)i));
+    }
+    writer.close();
+
+    writer = new BufferedWriter(new FileWriter(new File(right_dir, "r3.json")));
+    for (int i = 100; i < 150; ++i) {
+      writer.write(String.format("{ \"kr2\" : %f , \"vr2\": %f }\n", (float)i, (float)i));
+    }
+    writer.close();
+
+    // INNER JOIN
+    String query = String.format("select * from dfs_test.`%s` L %s join dfs_test.`%s` R on L.kl=R.kr",
+      left_dir.toPath().toString(), "inner", right_dir.toPath().toString());
+
+    TestBuilder builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
+      .unOrdered()
+      .baselineColumns("kl", "vl", "kr", "vr", "kl1", "vl1", "kl2", "vl2", "kr1", "vr1", "kr2", "vr2");
+
+    for (long i = 50; i < 100; ++i) {
+      builder.baselineValues(i, i, (double)i, (double)i, null, null, null, null, null, null, null, null);
+    }
+    builder.go();
+
+    // LEFT JOIN
+    query = String.format("select * from dfs_test.`%s` L %s join dfs_test.`%s` R on L.kl=R.kr",
+      left_dir.toPath().toString(), "left", right_dir.toPath().toString());
+
+    builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
+      .unOrdered()
+      .baselineColumns("kl", "vl", "kr", "vr", "kl1", "vl1", "kl2", "vl2", "kr1", "vr1", "kr2", "vr2");
+
+    for (long i = 0; i < 50; ++i) {
+      builder.baselineValues(null, null, null, null, i, i, null, null, null, null, null, null);
+    }
+    for (long i = 50; i < 100; ++i) {
+      builder.baselineValues(i, i, (double)i, (double)i, null, null, null, null, null, null, null, null);
+    }
+    for (long i = 100; i < 150; ++i) {
+      builder.baselineValues(null, null, null, null, null, null, i, i, null, null, null, null);
+    }
+    builder.go();
+
+    // RIGHT JOIN
+    query = String.format("select * from dfs_test.`%s` L %s join dfs_test.`%s` R on L.kl=R.kr",
+      left_dir.toPath().toString(), "right", right_dir.toPath().toString());
+
+    builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
+      .unOrdered()
+      .baselineColumns("kl", "vl", "kr", "vr", "kl1", "vl1", "kl2", "vl2", "kr1", "vr1", "kr2", "vr2");
+
+    for (long i = 0; i < 50; ++i) {
+      builder.baselineValues(null, null, null, null, null, null, null, null, (double)i, (double)i, null, null);
+    }
+    for (long i = 50; i < 100; ++i) {
+      builder.baselineValues(i, i, (double)i, (double)i, null, null, null, null, null, null, null, null);
+    }
+    for (long i = 100; i < 150; ++i) {
+      builder.baselineValues(null, null, null, null, null, null, null, null, null, null, (double)i, (double)i);
+    }
+    builder.go();
+  }
+
+  @Test
+  //@Ignore
+  public void testOneSideSchemaChanges() throws Exception {
+    final File left_dir = new File(BaseTestQuery.getTempDir("mergejoin-schemachanges-left"));
+    final File right_dir = new File(BaseTestQuery.getTempDir("mergejoin-schemachanges-right"));
+    left_dir.mkdirs();
+    right_dir.mkdirs();
+    System.out.println(left_dir);
+    System.out.println(right_dir);
+
+    BufferedWriter writer = new BufferedWriter(new FileWriter(new File(left_dir, "l1.json")));
+    for (int i = 0; i < 50; ++i) {
+      writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+    }
+    for (int i = 50; i < 100; ++i) {
+      writer.write(String.format("{ \"kl\" : %f , \"vl\": %f }\n", (float) i, (float) i));
+    }
+    writer.close();
+
+    writer = new BufferedWriter(new FileWriter(new File(right_dir, "r1.json")));
+    for (int i = 0; i < 50; ++i) {
+      writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+    }
+    writer.close();
+
+    String query = String.format("select * from dfs_test.`%s` L %s join dfs_test.`%s` R on L.kl=R.kl",
+      left_dir.toPath().toString(), "inner", right_dir.toPath().toString());
+    TestBuilder builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
+      .unOrdered()
+      .baselineColumns("kl", "vl", "kl0", "vl0");
+
+    for (long i = 0; i < 50; ++i) {
+      builder.baselineValues(i, i, i, i);
+    }
+    builder.go();
+  }
+}


[2/2] drill git commit: DRILL-4182 TopN schema changes support.

Posted by sm...@apache.org.
DRILL-4182 TopN schema changes support.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e529df46
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e529df46
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e529df46

Branch: refs/heads/master
Commit: e529df46054b685d72ff424c58e38a5dcdbc381a
Parents: cc9175c
Author: Amit Hadke <am...@gmail.com>
Authored: Thu Dec 10 00:21:52 2015 -0800
Committer: Steven Phillips <sm...@apache.org>
Committed: Sun Dec 13 23:23:22 2015 -0800

----------------------------------------------------------------------
 .../impl/TopN/PriorityQueueTemplate.java        |  16 +-
 .../exec/physical/impl/TopN/TopNBatch.java      |  75 ++++++-
 .../impl/svremover/RemovingRecordBatch.java     |   2 +-
 .../drill/exec/record/HyperVectorWrapper.java   |   3 +-
 .../apache/drill/exec/record/SchemaUtil.java    | 104 +++++----
 .../drill/exec/record/VectorContainer.java      |   6 +-
 .../apache/drill/exec/util/BatchPrinter.java    |  20 +-
 .../impl/TopN/TestTopNSchemaChanges.java        | 211 +++++++++++++++++++
 8 files changed, 384 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
index 5cdfc5d..2b1830e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
@@ -70,12 +70,17 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
       newContainer.add(container.getValueAccessorById(field.getValueClass(), ids).getValueVectors());
     }
     newContainer.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
+    // Cleanup before recreating hyperbatch and sv4.
+    cleanup();
     hyperBatch = new ExpandableHyperContainer(newContainer);
     batchCount = hyperBatch.iterator().next().getValueVectors().length;
     final DrillBuf drillBuf = allocator.buffer(4 * (limit + 1));
     heapSv4 = new SelectionVector4(drillBuf, limit, Character.MAX_VALUE);
+    // Reset queue size (most likely to be set to limit).
+    queueSize = 0;
     for (int i = 0; i < v4.getTotalCount(); i++) {
       heapSv4.set(i, v4.get(i));
+      ++queueSize;
     }
     v4.clear();
     doSetup(context, hyperBatch, null);
@@ -146,8 +151,15 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
 
   @Override
   public void cleanup() {
-    heapSv4.clear();
-    hyperBatch.clear();
+    if (heapSv4 != null) {
+      heapSv4.clear();
+    }
+    if (hyperBatch != null) {
+      hyperBatch.clear();
+    }
+    if (finalSv4 != null) {
+      finalSv4.clear();
+    }
   }
 
   private void siftUp() {

http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 54d2839..8c4cf21 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -49,6 +49,7 @@ import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
@@ -76,6 +77,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
 
   private final RecordBatch incoming;
   private BatchSchema schema;
+  private boolean schemaChanged = false;
   private PriorityQueue priorityQueue;
   private TopN config;
   SelectionVector4 sv4;
@@ -143,6 +145,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
         }
         container.buildSchema(SelectionVectorMode.NONE);
         container.setRecordCount(0);
+
         return;
       case STOP:
         state = BatchState.STOP;
@@ -202,9 +205,16 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
           // only change in the case that the schema truly changes.  Artificial schema changes are ignored.
           if (!incoming.getSchema().equals(schema)) {
             if (schema != null) {
-              throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+              if (!unionTypeEnabled) {
+                throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+              } else {
+                this.schema = SchemaUtil.mergeSchemas(this.schema, incoming.getSchema());
+                purgeAndResetPriorityQueue();
+                this.schemaChanged = true;
+              }
+            } else {
+              this.schema = incoming.getSchema();
             }
-            this.schema = incoming.getSchema();
           }
           // fall through.
         case OK:
@@ -216,11 +226,17 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
           }
           countSincePurge += incoming.getRecordCount();
           batchCount++;
-          RecordBatchData batch = new RecordBatchData(incoming);
+          RecordBatchData batch;
+          if (schemaChanged) {
+            batch = new RecordBatchData(SchemaUtil.coerceContainer(incoming, this.schema, oContext));
+          } else {
+            batch = new RecordBatchData(incoming);
+          }
           boolean success = false;
           try {
             batch.canonicalize();
             if (priorityQueue == null) {
+              assert !schemaChanged;
               priorityQueue = createNewPriorityQueue(context, config.getOrderings(), new ExpandableHyperContainer(batch.getContainer()), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
             }
             priorityQueue.add(context, batch);
@@ -255,7 +271,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
         container.add(w.getValueVectors());
       }
       container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
-
       recordCount = sv4.getCount();
       return IterOutcome.OK_NEW_SCHEMA;
 
@@ -323,7 +338,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     for (Ordering od : orderings) {
       // first, we rewrite the evaluation stack for each side of the comparison.
       ErrorCollector collector = new ErrorCollectorImpl();
-      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
+      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry(), unionTypeEnabled);
       if (collector.hasErrors()) {
         throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
       }
@@ -356,6 +371,56 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     return q;
   }
 
+  /**
+   * Handle schema changes during execution.
+   * 1. Purge existing batches
+   * 2. Promote newly created container for new schema.
+   * 3. Recreate priority queue and reset with coerced container.
+   * @throws SchemaChangeException
+   */
+  public void purgeAndResetPriorityQueue() throws SchemaChangeException, ClassTransformationException, IOException {
+    final Stopwatch watch = new Stopwatch();
+    watch.start();
+    final VectorContainer c = priorityQueue.getHyperBatch();
+    final VectorContainer newContainer = new VectorContainer(oContext);
+    final SelectionVector4 selectionVector4 = priorityQueue.getHeapSv4();
+    final SimpleRecordBatch batch = new SimpleRecordBatch(c, selectionVector4, context);
+    final SimpleRecordBatch newBatch = new SimpleRecordBatch(newContainer, null, context);
+    copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(),  newContainer, newBatch, null);
+    SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator());
+    try {
+      do {
+        final int count = selectionVector4.getCount();
+        final int copiedRecords = copier.copyRecords(0, count);
+        assert copiedRecords == count;
+        for (VectorWrapper<?> v : newContainer) {
+          ValueVector.Mutator m = v.getValueVector().getMutator();
+          m.setValueCount(count);
+        }
+        newContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+        newContainer.setRecordCount(count);
+        builder.add(newBatch);
+      } while (selectionVector4.next());
+      selectionVector4.clear();
+      c.clear();
+      final VectorContainer oldSchemaContainer = new VectorContainer(oContext);
+      builder.canonicalize();
+      builder.build(context, oldSchemaContainer);
+      oldSchemaContainer.setRecordCount(builder.getSv4().getCount());
+      final VectorContainer newSchemaContainer =  SchemaUtil.coerceContainer(oldSchemaContainer, this.schema, oContext);
+      // Canonicalize new container since we canonicalize incoming batches before adding to queue.
+      final VectorContainer canonicalizedContainer = VectorContainer.canonicalize(newSchemaContainer);
+      canonicalizedContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);
+      priorityQueue.cleanup();
+      priorityQueue = createNewPriorityQueue(context, config.getOrderings(), canonicalizedContainer, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
+      priorityQueue.resetQueue(canonicalizedContainer, builder.getSv4().createNewWrapperCurrent());
+    } finally {
+      builder.clear();
+      builder.close();
+    }
+    logger.debug("Took {} us to purge and recreate queue for new schema", watch.elapsed(TimeUnit.MICROSECONDS));
+  }
+
   @Override
   public WritableBatch getWritableBatch() {
     throw new UnsupportedOperationException("A sort batch is not writable.");

http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index fa6001b..5faaf58 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -62,7 +62,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
 
   @Override
   protected boolean setupNewSchema() throws SchemaChangeException {
-    container.zeroVectors();
+    container.clear();
     switch(incoming.getSchema().getSelectionVectorMode()){
     case NONE:
       this.copier = getStraightCopier();

http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
index a1557e6..7fc7960 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
@@ -132,7 +132,8 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<
   }
 
   public void addVector(ValueVector v) {
-    Preconditions.checkArgument(v.getClass() == this.getVectorClass(), String.format("Cannot add vector type %s to hypervector type %s", v.getClass(), this.getVectorClass()));
+    Preconditions.checkArgument(v.getClass() == this.getVectorClass(), String.format("Cannot add vector type %s to hypervector type %s for field %s",
+      v.getClass(), this.getVectorClass(), v.getField()));
     vectors = (T[]) ArrayUtils.add(vectors, v);// TODO optimize this so not copying every time
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
index 8a0954e..8cf90ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
@@ -96,6 +96,49 @@ public class SchemaUtil {
     return s;
   }
 
+  private static  ValueVector coerceVector(ValueVector v, VectorContainer c, MaterializedField field,
+                                           int recordCount, OperatorContext context) {
+    if (v != null) {
+      int valueCount = v.getAccessor().getValueCount();
+      TransferPair tp = v.getTransferPair();
+      tp.transfer();
+      if (v.getField().getType().getMinorType().equals(field.getType().getMinorType())) {
+        if (field.getType().getMinorType() == MinorType.UNION) {
+          UnionVector u = (UnionVector) tp.getTo();
+          for (MinorType t : field.getType().getSubTypeList()) {
+            if (u.getField().getType().getSubTypeList().contains(t)) {
+              continue;
+            }
+            u.addSubType(t);
+          }
+        }
+        return tp.getTo();
+      } else {
+        ValueVector newVector = TypeHelper.getNewVector(field, context.getAllocator());
+        Preconditions.checkState(field.getType().getMinorType() == MinorType.UNION, "Can only convert vector to Union vector");
+        UnionVector u = (UnionVector) newVector;
+        u.addVector(tp.getTo());
+        MinorType type = v.getField().getType().getMinorType();
+        for (int i = 0; i < valueCount; i++) {
+          u.getMutator().setType(i, type);
+        }
+        for (MinorType t : field.getType().getSubTypeList()) {
+          if (u.getField().getType().getSubTypeList().contains(t)) {
+            continue;
+          }
+          u.addSubType(t);
+        }
+        u.getMutator().setValueCount(valueCount);
+        return u;
+      }
+    } else {
+      v = TypeHelper.getNewVector(field, context.getAllocator());
+      v.allocateNew();
+      v.getMutator().setValueCount(recordCount);
+      return v;
+    }
+  }
+
   /**
    * Creates a copy a record batch, converting any fields as necessary to coerce it into the provided schema
    * @param in
@@ -105,54 +148,39 @@ public class SchemaUtil {
    */
   public static VectorContainer coerceContainer(VectorAccessible in, BatchSchema toSchema, OperatorContext context) {
     int recordCount = in.getRecordCount();
-    Map<SchemaPath,ValueVector> vectorMap = Maps.newHashMap();
+    boolean isHyper = false;
+    Map<SchemaPath, Object> vectorMap = Maps.newHashMap();
     for (VectorWrapper w : in) {
-      ValueVector v = w.getValueVector();
-      vectorMap.put(v.getField().getPath(), v);
+      if (w.isHyper()) {
+        isHyper = true;
+        final ValueVector[] vvs = w.getValueVectors();
+        vectorMap.put(vvs[0].getField().getPath(), vvs);
+      } else {
+        assert !isHyper;
+        final ValueVector v = w.getValueVector();
+        vectorMap.put(v.getField().getPath(), v);
+      }
     }
 
     VectorContainer c = new VectorContainer(context);
 
     for (MaterializedField field : toSchema) {
-      ValueVector v = vectorMap.remove(field.getPath());
-      if (v != null) {
-        int valueCount = v.getAccessor().getValueCount();
-        TransferPair tp = v.getTransferPair();
-        tp.transfer();
-        if (v.getField().getType().getMinorType().equals(field.getType().getMinorType())) {
-          if (field.getType().getMinorType() == MinorType.UNION) {
-            UnionVector u = (UnionVector) tp.getTo();
-            for (MinorType t : field.getType().getSubTypeList()) {
-              if (u.getField().getType().getSubTypeList().contains(t)) {
-                continue;
-              }
-              u.addSubType(t);
-            }
-          }
-          c.add(tp.getTo());
+      if (isHyper) {
+        final ValueVector[] vvs = (ValueVector[]) vectorMap.remove(field.getPath());
+        final ValueVector[] vvsOut;
+        if (vvs == null) {
+          vvsOut = new ValueVector[1];
+          vvsOut[0] = coerceVector(null, c, field, recordCount, context);
         } else {
-          ValueVector newVector = TypeHelper.getNewVector(field, context.getAllocator());
-          Preconditions.checkState(field.getType().getMinorType() == MinorType.UNION, "Can only convert vector to Union vector");
-          UnionVector u = (UnionVector) newVector;
-          u.addVector(tp.getTo());
-          MinorType type = v.getField().getType().getMinorType();
-          for (int i = 0; i < valueCount; i++) {
-            u.getMutator().setType(i, type);
-          }
-          for (MinorType t : field.getType().getSubTypeList()) {
-            if (u.getField().getType().getSubTypeList().contains(t)) {
-              continue;
-            }
-            u.addSubType(t);
+          vvsOut = new ValueVector[vvs.length];
+          for (int i = 0; i < vvs.length; ++i) {
+            vvsOut[i] = coerceVector(vvs[i], c, field, recordCount, context);
           }
-          u.getMutator().setValueCount(valueCount);
-          c.add(u);
         }
+        c.add(vvsOut);
       } else {
-        v = TypeHelper.getNewVector(field, context.getAllocator());
-        v.allocateNew();
-        v.getMutator().setValueCount(recordCount);
-        c.add(v);
+        final ValueVector v = (ValueVector) vectorMap.remove(field.getPath());
+        c.add(coerceVector(v, c, field, recordCount, context));
       }
     }
     c.buildSchema(in.getSchema().getSelectionVectorMode());

http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index ccc05ff..c483650 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -187,7 +187,11 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
     });
 
     for (VectorWrapper<?> w : canonicalWrappers) {
-      vc.add(w.getValueVector());
+      if (w.isHyper()) {
+        vc.add(w.getValueVectors());
+      } else {
+        vc.add(w.getValueVector());
+      }
     }
     vc.oContext = original.oContext;
     return vc;

http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
index 198c0b5..2a1db01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
@@ -42,15 +42,25 @@ public class BatchPrinter {
     }
     int width = columns.size();
     for (int j = 0; j < sv4.getCount(); j++) {
+      if (j%50 == 0) {
+        System.out.println(StringUtils.repeat("-", width * 17 + 1));
+        for (String column : columns) {
+          System.out.printf("| %-15s", width <= 15 ? column : column.substring(0, 14));
+        }
+        System.out.printf("|\n");
+        System.out.println(StringUtils.repeat("-", width*17 + 1));
+      }
       for (VectorWrapper vw : batch) {
         Object o = vw.getValueVectors()[sv4.get(j) >>> 16].getAccessor().getObject(sv4.get(j) & 65535);
-        if (o instanceof byte[]) {
-          String value = new String((byte[]) o);
-          System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14));
+        String value;
+        if (o == null) {
+          value = "null";
+        } else if (o instanceof byte[]) {
+          value = new String((byte[]) o);
         } else {
-          String value = o.toString();
-          System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14));
+          value = o.toString();
         }
+        System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14));
       }
       System.out.printf("|\n");
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/e529df46/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
new file mode 100644
index 0000000..0f65bab
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.TopN;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.TestBuilder;
+import org.apache.drill.exec.physical.impl.aggregate.InternalBatch;
+import org.junit.Test;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+
+public class TestTopNSchemaChanges extends BaseTestQuery {
+
+  @Test
+  public void testNumericTypes() throws Exception {
+    final File data_dir = new File(BaseTestQuery.getTempDir("topn-schemachanges"));
+    data_dir.mkdirs();
+
+    // left side int and strings
+    BufferedWriter writer = new BufferedWriter(new FileWriter(new File(data_dir, "d1.json")));
+    for (int i = 0; i < 10000; i+=2) {
+      writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+    }
+    writer.close();
+    writer = new BufferedWriter(new FileWriter(new File(data_dir, "d2.json")));
+    for (int i = 1; i < 10000; i+=2) {
+      writer.write(String.format("{ \"kl\" : %f , \"vl\": %f }\n", (float)i, (float)i));
+    }
+    writer.close();
+    String query = String.format("select * from dfs_test.`%s` order by kl limit 12", data_dir.toPath().toString());
+
+    TestBuilder builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+      .ordered()
+      .baselineColumns("kl", "vl");
+
+    for (long i = 0; i< 12 ; ++i) {
+      if (i %2 == 0) {
+        builder.baselineValues(i, i);
+      } else {
+        builder.baselineValues((double)i, (double)i);
+      }
+    }
+    builder.go();
+  }
+
+  @Test
+  public void testNumericAndStringTypes() throws Exception {
+    final File data_dir = new File(BaseTestQuery.getTempDir("topn-schemachanges"));
+    data_dir.mkdirs();
+
+    // left side int and strings
+    BufferedWriter writer = new BufferedWriter(new FileWriter(new File(data_dir, "d1.json")));
+    for (int i = 0; i < 1000; i+=2) {
+      writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+    }
+    writer.close();
+    writer = new BufferedWriter(new FileWriter(new File(data_dir, "d2.json")));
+    for (int i = 1; i < 1000; i+=2) {
+      writer.write(String.format("{ \"kl\" : \"%s\" , \"vl\": \"%s\" }\n", i, i));
+    }
+    writer.close();
+    String query = String.format("select * from dfs_test.`%s` order by kl limit 12", data_dir.toPath().toString());
+
+    TestBuilder builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+      .ordered()
+      .baselineColumns("kl", "vl");
+
+    for (long i = 0; i< 24 ; i+=2) {
+        builder.baselineValues(i, i);
+    }
+
+    query = String.format("select * from dfs_test.`%s` order by kl desc limit 12", data_dir.toPath().toString());
+    builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+      .ordered()
+      .baselineColumns("kl", "vl")
+      .baselineValues("999", "999")
+      .baselineValues("997", "997")
+      .baselineValues("995", "995")
+      .baselineValues("993", "993")
+      .baselineValues("991", "991")
+      .baselineValues("99", "99")
+      .baselineValues("989", "989")
+      .baselineValues("987", "987")
+      .baselineValues("985", "985")
+      .baselineValues("983", "983")
+      .baselineValues("981", "981")
+      .baselineValues("979", "979");
+    builder.go();
+  }
+
+  @Test
+  public void testUnionTypes() throws Exception {
+    final File data_dir = new File(BaseTestQuery.getTempDir("topn-schemachanges"));
+    data_dir.mkdirs();
+
+    // union of int and float and string.
+    BufferedWriter writer = new BufferedWriter(new FileWriter(new File(data_dir, "d1.json")));
+    for (int i = 0; i <= 9; ++i) {
+      switch (i%3) {
+        case 0: // 0, 3, 6, 9
+          writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
+          break;
+        case 1: // 1, 4, 7
+          writer.write(String.format("{ \"kl\" : %f , \"vl\": %f }\n", (float)i, (float)i));
+          break;
+        case 2: // 2, 5, 8
+          writer.write(String.format("{ \"kl\" : \"%s\" , \"vl\": \"%s\" }\n", i, i));
+          break;
+      }
+    }
+    writer.close();
+    String query = String.format("select * from dfs_test.`%s` order by kl limit 8", data_dir.toPath().toString());
+
+    TestBuilder builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+      .ordered()
+      .baselineColumns("kl", "vl");
+
+    builder.baselineValues(0l, 0l);
+    builder.baselineValues(1.0d, 1.0d);
+    builder.baselineValues(3l, 3l);
+    builder.baselineValues(4.0d, 4.0d);
+    builder.baselineValues(6l, 6l);
+    builder.baselineValues(7.0d, 7.0d);
+    builder.baselineValues(9l, 9l);
+    builder.baselineValues("2", "2");
+    builder.go();
+  }
+
+  @Test
+  public void testMissingColumn() throws Exception {
+    final File data_dir = new File(BaseTestQuery.getTempDir("topn-schemachanges"));
+    data_dir.mkdirs();
+    System.out.println(data_dir);
+    BufferedWriter writer = new BufferedWriter(new FileWriter(new File(data_dir, "d1.json")));
+    for (int i = 0; i < 100; i++) {
+      writer.write(String.format("{ \"kl1\" : %d , \"vl1\": %d }\n", i, i));
+    }
+    writer.close();
+    writer = new BufferedWriter(new FileWriter(new File(data_dir, "d2.json")));
+    for (int i = 100; i < 200; i++) {
+      writer.write(String.format("{ \"kl\" : %f , \"vl\": %f }\n", (float)i, (float)i));
+    }
+    writer.close();
+    writer = new BufferedWriter(new FileWriter(new File(data_dir, "d3.json")));
+    for (int i = 200; i < 300; i++) {
+      writer.write(String.format("{ \"kl2\" : \"%s\" , \"vl2\": \"%s\" }\n", i, i));
+    }
+    writer.close();
+
+    String query = String.format("select kl, vl, kl1, vl1, kl2, vl2 from dfs_test.`%s` order by kl limit 3", data_dir.toPath().toString());
+    TestBuilder builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+      .ordered()
+      .baselineColumns("kl", "vl", "kl1", "vl1", "kl2", "vl2")
+      .baselineValues(100.0d, 100.0d, null, null, null, null)
+      .baselineValues(101.0d, 101.0d, null, null, null, null)
+      .baselineValues(102.0d, 102.0d, null, null, null, null);
+    builder.go();
+
+    query = String.format("select kl, vl, kl1, vl1, kl2, vl2  from dfs_test.`%s` order by kl1 limit 3", data_dir.toPath().toString());
+    builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+      .ordered()
+      .baselineColumns("kl", "vl", "kl1", "vl1", "kl2", "vl2")
+      .baselineValues(null, null, 0l, 0l, null, null)
+      .baselineValues(null, null, 1l, 1l, null, null)
+      .baselineValues(null, null, 2l, 2l, null, null);
+    builder.go();
+
+    query = String.format("select kl, vl, kl1, vl1, kl2, vl2 from dfs_test.`%s` order by kl2 desc limit 3", data_dir.toPath().toString());
+    builder = testBuilder()
+      .sqlQuery(query)
+      .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+      .ordered()
+      .baselineColumns("kl", "vl", "kl1", "vl1", "kl2", "vl2")
+      .baselineValues(null, null, null, null, "299", "299")
+      .baselineValues(null, null, null, null, "298", "298")
+      .baselineValues(null, null, null, null, "297", "297");
+    builder.go();
+    // Since client can't handle new columns which are not in first batch, we won't test output of query.
+    // Query should run w/o any errors.
+    test(String.format("select * from dfs_test.`%s` order by kl limit 3", data_dir.toPath().toString()));
+  }
+}