You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2017/02/04 03:58:56 UTC
[6/8] drill git commit: DRILL-5237: FlattenRecordBatch loses nested
fields from the schema when returns empty batches for the first time
DRILL-5237: FlattenRecordBatch loses nested fields from the schema when returns empty batches for the first time
This closes #735
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c8fbc386
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c8fbc386
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c8fbc386
Branch: refs/heads/master
Commit: c8fbc386c93484b05307f77efe88333746cb8a5e
Parents: 7913d5d
Author: Serhii-Harnyk <se...@gmail.com>
Authored: Fri Jan 27 15:36:10 2017 +0000
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Fri Feb 3 17:42:29 2017 -0800
----------------------------------------------------------------------
.../impl/flatten/FlattenRecordBatch.java | 20 ++++++++---
.../exec/physical/impl/flatten/TestFlatten.java | 35 ++++++++++++++++++++
2 files changed, 51 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/c8fbc386/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index fc80b49..cadb3e3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -349,7 +349,6 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
}
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true);
- final MaterializedField outputField = MaterializedField.create(outputName, expr.getMajorType());
if (collector.hasErrors()) {
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
@@ -364,10 +363,23 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
// The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef());
cg.addExpr(expr);
- } else{
+ } else {
// need to do evaluation.
- @SuppressWarnings("resource")
- ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
+ final MaterializedField outputField;
+ if (expr instanceof ValueVectorReadExpression) {
+ final TypedFieldId id = ValueVectorReadExpression.class.cast(expr).getFieldId();
+ final ValueVector incomingVector = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
+ // outputField is taken from the incoming schema to avoid the loss of nested fields
+ // when the first batch will be empty.
+ if (incomingVector != null) {
+ outputField = incomingVector.getField().clone();
+ } else {
+ outputField = MaterializedField.create(outputName, expr.getMajorType());
+ }
+ } else {
+ outputField = MaterializedField.create(outputName, expr.getMajorType());
+ }
+ final ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
allocationVectors.add(vector);
TypedFieldId fid = container.add(vector);
ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
http://git-wip-us.apache.org/repos/asf/drill/blob/c8fbc386/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
index 5895a3f..522a5d0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
@@ -17,16 +17,21 @@
******************************************************************************/
package org.apache.drill.exec.physical.impl.flatten;
+import static org.apache.commons.io.FileUtils.deleteQuietly;
import static org.apache.drill.TestBuilder.listOf;
import static org.apache.drill.TestBuilder.mapOf;
import static org.junit.Assert.assertEquals;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
import java.util.List;
import org.apache.drill.BaseTestQuery;
import org.apache.drill.TestBuilder;
import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.fn.interp.TestConstantFolding;
+import org.apache.drill.exec.store.easy.json.JSONRecordReader;
import org.apache.drill.exec.util.JsonStringHashMap;
import org.junit.Ignore;
import org.junit.Rule;
@@ -552,5 +557,35 @@ public class TestFlatten extends BaseTestQuery {
}
+ @Test
+ public void testFlattenOnEmptyArrayAndNestedMap() throws Exception {
+ File path = new File(BaseTestQuery.getTempDir("json/input"));
+ try {
+ path.mkdirs();
+ String pathString = path.toPath().toString();
+
+ try (BufferedWriter writer = new BufferedWriter(new FileWriter(new File(path, "empty_arrays.json")))) {
+ writer.write("{\"a\" : {\"a1\" : \"a1\"}, \"b\" : [1]}\n");
+ for (int i = 0; i < JSONRecordReader.DEFAULT_ROWS_PER_BATCH; i++) {
+ writer.write("{\"a\" : {\"a1\" : \"a1\"}, \"b\" : [], \"c\" : 1}\n");
+ }
+ writer.write("{\"a\" : {\"a1\" : \"a1\"}, \"b\" : [1], \"c\" : 1}");
+ }
+
+ String query = "select typeof(t1.a.a1) as col from " +
+ "(select t.*, flatten(t.b) as b from dfs_test.`%s/empty_arrays.json` t where t.c is not null) t1";
+
+ testBuilder()
+ .sqlQuery(query, pathString)
+ .unOrdered()
+ .baselineColumns("col")
+ .baselineValues("VARCHAR")
+ .go();
+
+ } finally {
+ deleteQuietly(path);
+ }
+ }
+
}