You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2017/02/04 03:58:56 UTC

[6/8] drill git commit: DRILL-5237: FlattenRecordBatch loses nested fields from the schema when returns empty batches for the first time

DRILL-5237: FlattenRecordBatch loses nested fields from the schema when returns empty batches for the first time

This closes #735


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c8fbc386
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c8fbc386
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c8fbc386

Branch: refs/heads/master
Commit: c8fbc386c93484b05307f77efe88333746cb8a5e
Parents: 7913d5d
Author: Serhii-Harnyk <se...@gmail.com>
Authored: Fri Jan 27 15:36:10 2017 +0000
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Fri Feb 3 17:42:29 2017 -0800

----------------------------------------------------------------------
 .../impl/flatten/FlattenRecordBatch.java        | 20 ++++++++---
 .../exec/physical/impl/flatten/TestFlatten.java | 35 ++++++++++++++++++++
 2 files changed, 51 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/c8fbc386/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index fc80b49..cadb3e3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -349,7 +349,6 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
       }
 
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true);
-      final MaterializedField outputField = MaterializedField.create(outputName, expr.getMajorType());
       if (collector.hasErrors()) {
         throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
       }
@@ -364,10 +363,23 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
         // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
         ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef());
         cg.addExpr(expr);
-      } else{
+      } else {
         // need to do evaluation.
-        @SuppressWarnings("resource")
-        ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
+        final MaterializedField outputField;
+        if (expr instanceof ValueVectorReadExpression) {
+          final TypedFieldId id = ValueVectorReadExpression.class.cast(expr).getFieldId();
+          final ValueVector incomingVector = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
+          // outputField is taken from the incoming schema to avoid the loss of nested fields
+          // when the first batch will be empty.
+          if (incomingVector != null) {
+            outputField = incomingVector.getField().clone();
+          } else {
+            outputField = MaterializedField.create(outputName, expr.getMajorType());
+          }
+        } else {
+          outputField = MaterializedField.create(outputName, expr.getMajorType());
+        }
+        final ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
         allocationVectors.add(vector);
         TypedFieldId fid = container.add(vector);
         ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);

http://git-wip-us.apache.org/repos/asf/drill/blob/c8fbc386/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
index 5895a3f..522a5d0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
@@ -17,16 +17,21 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.impl.flatten;
 
+import static org.apache.commons.io.FileUtils.deleteQuietly;
 import static org.apache.drill.TestBuilder.listOf;
 import static org.apache.drill.TestBuilder.mapOf;
 import static org.junit.Assert.assertEquals;
 
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
 import java.util.List;
 
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.TestBuilder;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.fn.interp.TestConstantFolding;
+import org.apache.drill.exec.store.easy.json.JSONRecordReader;
 import org.apache.drill.exec.util.JsonStringHashMap;
 import org.junit.Ignore;
 import org.junit.Rule;
@@ -552,5 +557,35 @@ public class TestFlatten extends BaseTestQuery {
 
   }
 
+  @Test
+  public void testFlattenOnEmptyArrayAndNestedMap() throws Exception {
+    File path = new File(BaseTestQuery.getTempDir("json/input"));
+    try {
+      path.mkdirs();
+      String pathString = path.toPath().toString();
+
+      try (BufferedWriter writer = new BufferedWriter(new FileWriter(new File(path, "empty_arrays.json")))) {
+        writer.write("{\"a\" : {\"a1\" : \"a1\"}, \"b\" : [1]}\n");
+        for (int i = 0; i < JSONRecordReader.DEFAULT_ROWS_PER_BATCH; i++) {
+          writer.write("{\"a\" : {\"a1\" : \"a1\"}, \"b\" : [], \"c\" : 1}\n");
+        }
+        writer.write("{\"a\" : {\"a1\" : \"a1\"}, \"b\" : [1], \"c\" : 1}");
+      }
+
+      String query = "select typeof(t1.a.a1) as col from " +
+        "(select t.*, flatten(t.b) as b from dfs_test.`%s/empty_arrays.json` t where t.c is not null) t1";
+
+      testBuilder()
+        .sqlQuery(query, pathString)
+        .unOrdered()
+        .baselineColumns("col")
+        .baselineValues("VARCHAR")
+        .go();
+
+    } finally {
+      deleteQuietly(path);
+    }
+  }
+
 }