You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by su...@apache.org on 2017/01/30 20:50:13 UTC

drill git commit: DRILL-3562: Query fails when using flatten on JSON data where some documents have an empty array

Repository: drill
Updated Branches:
  refs/heads/master 60624af22 -> 88655adfe


DRILL-3562: Query fails when using flatten on JSON data where some documents have an empty array

closes #713


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/88655adf
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/88655adf
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/88655adf

Branch: refs/heads/master
Commit: 88655adfea8e7884e67559d9ff6053b01890ad7a
Parents: 60624af
Author: Serhii-Harnyk <se...@gmail.com>
Authored: Tue Dec 20 16:55:41 2016 +0000
Committer: Sudheesh Katkam <su...@apache.org>
Committed: Mon Jan 30 10:09:39 2017 -0800

----------------------------------------------------------------------
 .../impl/flatten/FlattenRecordBatch.java        | 24 +++++--
 .../exec/vector/complex/fn/JsonReader.java      | 31 ++++++++-
 .../vector/complex/writer/TestJsonReader.java   | 73 +++++++++++++++++++-
 3 files changed, 118 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/88655adf/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index bedf731..fc80b49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -27,6 +27,7 @@ import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
@@ -310,12 +311,23 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
 
     final NamedExpression flattenExpr = new NamedExpression(popConfig.getColumn(), new FieldReference(popConfig.getColumn()));
     final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression)ExpressionTreeMaterializer.materialize(flattenExpr.getExpr(), incoming, collector, context.getFunctionRegistry(), true);
-    final TransferPair tp = getFlattenFieldTransferPair(flattenExpr.getRef());
-
-    if (tp != null) {
-      transfers.add(tp);
-      container.add(tp.getTo());
-      transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
+    final FieldReference fieldReference = flattenExpr.getRef();
+    final TransferPair transferPair = getFlattenFieldTransferPair(fieldReference);
+
+    if (transferPair != null) {
+      final ValueVector flattenVector = transferPair.getTo();
+
+      // checks that list has only default ValueVector and replaces resulting ValueVector to INT typed ValueVector
+      if (exprs.size() == 0 && flattenVector.getField().getType().equals(Types.LATE_BIND_TYPE)) {
+        final MaterializedField outputField = MaterializedField.create(fieldReference.getAsNamePart().getName(), Types.OPTIONAL_INT);
+        final ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
+
+        container.add(vector);
+      } else {
+        transfers.add(transferPair);
+        container.add(flattenVector);
+        transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
+      }
     }
 
     logger.debug("Added transfer for project expression.");

http://git-wip-us.apache.org/repos/asf/drill/blob/88655adf/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 1bc5eaa..9848ae7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -28,9 +28,7 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.store.easy.json.JsonProcessor.ReadState;
 import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor;
-import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor.JsonExceptionProcessingState;
 import org.apache.drill.exec.vector.complex.fn.VectorOutput.ListVectorOutput;
 import org.apache.drill.exec.vector.complex.fn.VectorOutput.MapVectorOutput;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter;
@@ -59,6 +57,12 @@ public class JsonReader extends BaseJsonProcessor {
   private final boolean readNumbersAsDouble;
 
   /**
+   * Collection for tracking empty array writers during reading
+   * and storing them for initializing empty arrays
+   */
+  private final List<ListWriter> emptyArrayWriters = Lists.newArrayList();
+
+  /**
    * Describes whether or not this reader can unwrap a single root array record
    * and treat it like a set of distinct records.
    */
@@ -153,6 +157,17 @@ public class JsonReader extends BaseJsonProcessor {
         }
       }
     }
+
+    for (ListWriter field : emptyArrayWriters) {
+      // checks that array has not been initialized
+      if (field.getValueCapacity() == 0) {
+        if (allTextMode) {
+          field.varChar();
+        } else {
+          field.integer();
+        }
+      }
+    }
   }
 
   public void setSource(int start, int end, DrillBuf buf) throws IOException {
@@ -544,6 +559,7 @@ public class JsonReader extends BaseJsonProcessor {
           }
           break;
         case END_ARRAY:
+          addIfNotInitialized(list);
         case END_OBJECT:
           break outside;
 
@@ -591,6 +607,16 @@ public class JsonReader extends BaseJsonProcessor {
 
   }
 
+  /**
+   * Checks that list has not been initialized and adds it to the emptyArrayWriters collection.
+   * @param list ListWriter that should be checked
+   */
+  private void addIfNotInitialized(ListWriter list) {
+    if (list.getValueCapacity() == 0) {
+      emptyArrayWriters.add(list);
+    }
+  }
+
   private void writeDataAllText(ListWriter list) throws IOException {
     list.startList();
     outside: while (true) {
@@ -605,6 +631,7 @@ public class JsonReader extends BaseJsonProcessor {
         }
         break;
       case END_ARRAY:
+        addIfNotInitialized(list);
       case END_OBJECT:
         break outside;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/88655adf/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
index 1168e37..78c2c4c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
@@ -24,16 +24,16 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.BufferedOutputStream;
+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.List;
 import java.util.zip.GZIPOutputStream;
 
-import com.google.common.base.Joiner;
-
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.util.FileUtils;
@@ -652,4 +652,73 @@ public class TestJsonReader extends BaseTestQuery {
     }
   }
 
+  @Test
+  public void testFlattenEmptyArrayWithAllTextMode() throws Exception {
+    File path = new File(BaseTestQuery.getTempDir("json/input"));
+    path.mkdirs();
+    path.deleteOnExit();
+    String pathString = path.toPath().toString();
+
+    try (BufferedWriter writer = new BufferedWriter(new FileWriter(new File(path, "empty_array_all_text_mode.json")))) {
+      writer.write("{ \"a\": { \"b\": { \"c\": [] }, \"c\": [] } }");
+    }
+
+    try {
+      String query = String.format("select flatten(t.a.b.c) as c from dfs_test.`%s/empty_array_all_text_mode.json` t",
+        pathString);
+
+      testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .optionSettingQueriesForTestQuery("alter session set `store.json.all_text_mode` = true")
+        .expectsEmptyResultSet()
+        .go();
+
+      testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .optionSettingQueriesForTestQuery("alter session set `store.json.all_text_mode` = false")
+        .expectsEmptyResultSet()
+        .go();
+
+    } finally {
+      testNoResult("alter session reset `store.json.all_text_mode`");
+    }
+  }
+
+  @Test
+  public void testFlattenEmptyArrayWithUnionType() throws Exception {
+    File path = new File(BaseTestQuery.getTempDir("json/input"));
+    path.mkdirs();
+    path.deleteOnExit();
+    String pathString = path.toPath().toString();
+
+    try (BufferedWriter writer = new BufferedWriter(new FileWriter(new File(path, "empty_array.json")))) {
+      writer.write("{ \"a\": { \"b\": { \"c\": [] }, \"c\": [] } }");
+    }
+
+    try {
+      String query = String.format("select flatten(t.a.b.c) as c from dfs_test.`%s/empty_array.json` t",
+        pathString);
+
+      testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+        .expectsEmptyResultSet()
+        .go();
+
+      testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+        .optionSettingQueriesForTestQuery("alter session set `store.json.all_text_mode` = true")
+        .expectsEmptyResultSet()
+        .go();
+
+    } finally {
+      testNoResult("alter session reset `store.json.all_text_mode`");
+      testNoResult("alter session reset `exec.enable_union_type`");
+    }
+  }
 }