You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/11/12 05:11:29 UTC
[12/16] incubator-drill git commit: DRILL-1648: Fix for fast schema
issue that was causing compilation issues in downstream operators.
DRILL-1648: Fix for fast schema issue that was causing compilation issues in downstream operators.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e515e621
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e515e621
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e515e621
Branch: refs/heads/master
Commit: e515e6211201ec764d405c9a04b3ce43e4b3259d
Parents: 2f6efea
Author: Jason Altekruse <al...@gmail.com>
Authored: Tue Nov 4 15:08:27 2014 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Nov 11 16:48:45 2014 -0800
----------------------------------------------------------------------
.../impl/flatten/FlattenRecordBatch.java | 60 +++++++++++++++-----
.../org/apache/drill/TestExampleQueries.java | 18 ------
.../exec/physical/impl/flatten/TestFlatten.java | 42 ++++++++++++++
3 files changed, 87 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e515e621/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 5171a25..129174e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -53,12 +53,15 @@ import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.RepeatedVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.MapVector;
import org.apache.drill.exec.vector.complex.RepeatedMapVector;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
import com.google.common.collect.Lists;
import com.sun.codemodel.JExpr;
+// TODO - handle the case where a user tries to flatten a scalar, should just act as a project all of the columns exactly
+// as they come in
public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenRecordBatch.class);
@@ -251,8 +254,19 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
incoming.buildSchema();
if ( ! fastSchemaCalled ) {
for (VectorWrapper vw : incoming) {
- ValueVector vector = container.addOrGet(vw.getField());
- container.add(vector);
+ if (vw.getField().getPath().equals(popConfig.getColumn())) {
+ if (vw.getValueVector() instanceof MapVector) {
+ // fast schema upstream did not report a repeated type
+ // assume it will be repeated in the actual results and it will fail in execution if it is not
+ ValueVector vector = container.addOrGet(vw.getField());
+ container.add(vector);
+ } else {
+ container.add(getFlattenFieldTransferPair().getTo());
+ }
+ } else {
+ ValueVector vector = container.addOrGet(vw.getField());
+ container.add(vector);
+ }
}
fastSchemaCalled = true;
container.buildSchema(SelectionVectorMode.NONE);
@@ -264,6 +278,33 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
}
}
+ /**
+ * The data layout is the same for the actual data within a repeated field, as it is in a scalar vector for
+ * the same sql type. For example, a repeated int vector has a vector of offsets into a regular int vector to
+ * represent the lists. As the data layout for the actual values in the same in the repeated vector as in the
+ * scalar vector of the same type, we can avoid making individual copies for the column being flattened, and just
+ * use vector copies between the inner vector of the repeated field to the resulting scalar vector from the flatten
+ * operation. This is completed after we determine how many records will fit (as we will hit either a batch end, or
+ * the end of one of the other vectors while we are copying the data of the other vectors alongside each new flattened
+ * value coming out of the repeated field.)
+ */
+ private TransferPair getFlattenFieldTransferPair() {
+ ValueVector flattenField = incoming.getValueAccessorById(
+ incoming.getSchema().getColumn(
+ incoming.getValueVectorId(
+ popConfig.getColumn()).getFieldIds()[0]).getValueClass(),
+ incoming.getValueVectorId(popConfig.getColumn()).getFieldIds()).getValueVector();
+
+ TransferPair tp;
+ if (flattenField instanceof RepeatedMapVector) {
+ tp = ((RepeatedMapVector)flattenField).getTransferPairToSingleMap();
+ } else {
+ ValueVector vvIn = ((RepeatedVector)flattenField).getAccessor().getAllChildValues();
+ tp = vvIn.getTransferPair();
+ }
+ return tp;
+ }
+
@Override
protected boolean setupNewSchema() throws SchemaChangeException {
this.allocationVectors = Lists.newArrayList();
@@ -275,25 +316,14 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
final ClassGenerator<Flattener> cg = CodeGenerator.getRoot(Flattener.TEMPLATE_DEFINITION, context.getFunctionRegistry());
IntOpenHashSet transferFieldIds = new IntOpenHashSet();
- RepeatedVector flattenField = ((RepeatedVector) incoming.getValueAccessorById(
- incoming.getSchema().getColumn(
- incoming.getValueVectorId(
- popConfig.getColumn()).getFieldIds()[0]).getValueClass(),
- incoming.getValueVectorId(popConfig.getColumn()).getFieldIds()).getValueVector());
-
NamedExpression namedExpression = new NamedExpression(popConfig.getColumn(), new FieldReference(popConfig.getColumn()));
LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true);
ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
TypedFieldId id = vectorRead.getFieldId();
Preconditions.checkNotNull(incoming);
- TransferPair tp = null;
- if (flattenField instanceof RepeatedMapVector) {
- tp = ((RepeatedMapVector)flattenField).getTransferPairToSingleMap();
- } else {
- ValueVector vvIn = flattenField.getAccessor().getAllChildValues();
- tp = vvIn.getTransferPair();
- }
+ TransferPair tp = getFlattenFieldTransferPair();
+
transfers.add(tp);
container.add(tp.getTo());
transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e515e621/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 5b64d15..bb411e7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -34,24 +34,6 @@ public class TestExampleQueries extends BaseTestQuery{
test("select recipe, c.inventor.name as name, c.inventor.age as age from cp.`parquet/complex.parquet` c");
}
- @Test
- public void testFlatten() throws Exception {
- test("select flatten(kvgen(f1)) as monkey, x " +
- "from cp.`/store/json/test_flatten_mapify.json`");
-
- test("select t2.key from (select t.monkey.`value` as val, t.monkey.key as key from (select flatten(kvgen(f1)) as monkey, x " +
- "from cp.`/store/json/test_flatten_mapify.json`) as t) as t2 where t2.val > 1");
-
- test("select `integer`, `float`, x, flatten(z), flatten(l) from cp.`/jsoninput/input2_modified.json`");
-
- }
-
- @Test
- @Ignore("Can't be run on classpath since that fs doesn't support glob queries.")
- public void testWildcard() throws Exception {
- test("select * from dfs.`/tmp/xx/ab*/*.json`");
- }
-
@Test // see DRILL-553
public void testQueryWithNullValues() throws Exception {
test("select count(*) from cp.`customer.json` limit 1");
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e515e621/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
new file mode 100644
index 0000000..9514517
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
@@ -0,0 +1,42 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl.flatten;
+
+import org.apache.drill.BaseTestQuery;
+import org.junit.Test;
+
+public class TestFlatten extends BaseTestQuery {
+
+ @Test
+ public void testKVGenFlatten1() throws Exception {
+ test("select flatten(kvgen(f1)) as monkey, x " +
+ "from cp.`/store/json/test_flatten_mapify.json`");
+ }
+
+ @Test
+ public void testTwoFlattens() throws Exception {
+ test("select `integer`, `float`, x, flatten(z), flatten(l) from cp.`/jsoninput/input2_modified.json`");
+ }
+
+ @Test
+ public void testFilterFlattenedRecords() throws Exception {
+ test("select t2.key from (select t.monkey.`value` as val, t.monkey.key as key from (select flatten(kvgen(f1)) as monkey, x " +
+ "from cp.`/store/json/test_flatten_mapify.json`) as t) as t2 where t2.val > 1");
+ }
+
+}