You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/11/12 05:11:29 UTC

[12/16] incubator-drill git commit: DRILL-1648: Fix for fast schema issue that was causing compilation issues in downstream operators.

DRILL-1648: Fix for fast schema issue that was causing compilation issues in downstream operators.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e515e621
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e515e621
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e515e621

Branch: refs/heads/master
Commit: e515e6211201ec764d405c9a04b3ce43e4b3259d
Parents: 2f6efea
Author: Jason Altekruse <al...@gmail.com>
Authored: Tue Nov 4 15:08:27 2014 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Nov 11 16:48:45 2014 -0800

----------------------------------------------------------------------
 .../impl/flatten/FlattenRecordBatch.java        | 60 +++++++++++++++-----
 .../org/apache/drill/TestExampleQueries.java    | 18 ------
 .../exec/physical/impl/flatten/TestFlatten.java | 42 ++++++++++++++
 3 files changed, 87 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e515e621/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 5171a25..129174e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -53,12 +53,15 @@ import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.RepeatedVector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
 import com.google.common.collect.Lists;
 import com.sun.codemodel.JExpr;
 
+// TODO - handle the case where a user tries to flatten a scalar, should just act as a project all of the columns exactly
+// as they come in
 public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenRecordBatch.class);
 
@@ -251,8 +254,19 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
     incoming.buildSchema();
     if ( ! fastSchemaCalled ) {
       for (VectorWrapper vw : incoming) {
-        ValueVector vector = container.addOrGet(vw.getField());
-        container.add(vector);
+        if (vw.getField().getPath().equals(popConfig.getColumn())) {
+          if (vw.getValueVector() instanceof MapVector) {
+            // fast schema upstream did not report a repeated type
+            // assume it will be repeated in the actual results and it will fail in execution if it is not
+            ValueVector vector = container.addOrGet(vw.getField());
+            container.add(vector);
+          } else {
+            container.add(getFlattenFieldTransferPair().getTo());
+          }
+        } else {
+          ValueVector vector = container.addOrGet(vw.getField());
+          container.add(vector);
+        }
       }
       fastSchemaCalled = true;
       container.buildSchema(SelectionVectorMode.NONE);
@@ -264,6 +278,33 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
     }
   }
 
+  /**
+   * The data layout is the same for the actual data within a repeated field, as it is in a scalar vector for
+   * the same sql type. For example, a repeated int vector has a vector of offsets into a regular int vector to
+   * represent the lists. As the data layout for the actual values in the same in the repeated vector as in the
+   * scalar vector of the same type, we can avoid making individual copies for the column being flattened, and just
+   * use vector copies between the inner vector of the repeated field to the resulting scalar vector from the flatten
+   * operation. This is completed after we determine how many records will fit (as we will hit either a batch end, or
+   * the end of one of the other vectors while we are copying the data of the other vectors alongside each new flattened
+   * value coming out of the repeated field.)
+   */
+  private TransferPair getFlattenFieldTransferPair() {
+    ValueVector flattenField = incoming.getValueAccessorById(
+        incoming.getSchema().getColumn(
+            incoming.getValueVectorId(
+                popConfig.getColumn()).getFieldIds()[0]).getValueClass(),
+        incoming.getValueVectorId(popConfig.getColumn()).getFieldIds()).getValueVector();
+
+    TransferPair tp;
+    if (flattenField instanceof RepeatedMapVector) {
+      tp = ((RepeatedMapVector)flattenField).getTransferPairToSingleMap();
+    } else {
+      ValueVector vvIn = ((RepeatedVector)flattenField).getAccessor().getAllChildValues();
+      tp = vvIn.getTransferPair();
+    }
+    return tp;
+  }
+
   @Override
   protected boolean setupNewSchema() throws SchemaChangeException {
     this.allocationVectors = Lists.newArrayList();
@@ -275,25 +316,14 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
     final ClassGenerator<Flattener> cg = CodeGenerator.getRoot(Flattener.TEMPLATE_DEFINITION, context.getFunctionRegistry());
     IntOpenHashSet transferFieldIds = new IntOpenHashSet();
 
-    RepeatedVector flattenField = ((RepeatedVector) incoming.getValueAccessorById(
-          incoming.getSchema().getColumn(
-              incoming.getValueVectorId(
-                  popConfig.getColumn()).getFieldIds()[0]).getValueClass(),
-          incoming.getValueVectorId(popConfig.getColumn()).getFieldIds()).getValueVector());
-
     NamedExpression namedExpression = new NamedExpression(popConfig.getColumn(), new FieldReference(popConfig.getColumn()));
     LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true);
     ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
     TypedFieldId id = vectorRead.getFieldId();
     Preconditions.checkNotNull(incoming);
 
-    TransferPair tp = null;
-    if (flattenField instanceof RepeatedMapVector) {
-      tp = ((RepeatedMapVector)flattenField).getTransferPairToSingleMap();
-    } else {
-      ValueVector vvIn = flattenField.getAccessor().getAllChildValues();
-      tp = vvIn.getTransferPair();
-    }
+    TransferPair tp = getFlattenFieldTransferPair();
+
     transfers.add(tp);
     container.add(tp.getTo());
     transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e515e621/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 5b64d15..bb411e7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -34,24 +34,6 @@ public class TestExampleQueries extends BaseTestQuery{
     test("select recipe, c.inventor.name as name, c.inventor.age as age from cp.`parquet/complex.parquet` c");
   }
 
-  @Test
-  public void testFlatten() throws Exception {
-    test("select flatten(kvgen(f1)) as monkey, x " +
-        "from cp.`/store/json/test_flatten_mapify.json`");
-
-    test("select t2.key from (select t.monkey.`value` as val, t.monkey.key as key from (select flatten(kvgen(f1)) as monkey, x " +
-        "from cp.`/store/json/test_flatten_mapify.json`) as t) as t2 where t2.val > 1");
-
-    test("select `integer`, `float`, x, flatten(z), flatten(l) from cp.`/jsoninput/input2_modified.json`");
-
-  }
-
-  @Test
-  @Ignore("Can't be run on classpath since that fs doesn't support glob queries.")
-  public void testWildcard() throws Exception {
-    test("select * from dfs.`/tmp/xx/ab*/*.json`");
-  }
-
   @Test // see DRILL-553
   public void testQueryWithNullValues() throws Exception {
     test("select count(*) from cp.`customer.json` limit 1");

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e515e621/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
new file mode 100644
index 0000000..9514517
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
@@ -0,0 +1,42 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl.flatten;
+
+import org.apache.drill.BaseTestQuery;
+import org.junit.Test;
+
+public class TestFlatten extends BaseTestQuery {
+
+  @Test
+  public void testKVGenFlatten1() throws Exception {
+    test("select flatten(kvgen(f1)) as monkey, x " +
+        "from cp.`/store/json/test_flatten_mapify.json`");
+  }
+
+  @Test
+  public void testTwoFlattens() throws Exception {
+    test("select `integer`, `float`, x, flatten(z), flatten(l) from cp.`/jsoninput/input2_modified.json`");
+  }
+
+  @Test
+  public void testFilterFlattenedRecords() throws Exception {
+    test("select t2.key from (select t.monkey.`value` as val, t.monkey.key as key from (select flatten(kvgen(f1)) as monkey, x " +
+        "from cp.`/store/json/test_flatten_mapify.json`) as t) as t2 where t2.val > 1");
+  }
+
+}