You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by su...@apache.org on 2017/01/30 20:50:13 UTC
drill git commit: DRILL-3562: Query fails when using flatten on JSON
data where some documents have an empty array
Repository: drill
Updated Branches:
refs/heads/master 60624af22 -> 88655adfe
DRILL-3562: Query fails when using flatten on JSON data where some documents have an empty array
closes #713
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/88655adf
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/88655adf
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/88655adf
Branch: refs/heads/master
Commit: 88655adfea8e7884e67559d9ff6053b01890ad7a
Parents: 60624af
Author: Serhii-Harnyk <se...@gmail.com>
Authored: Tue Dec 20 16:55:41 2016 +0000
Committer: Sudheesh Katkam <su...@apache.org>
Committed: Mon Jan 30 10:09:39 2017 -0800
----------------------------------------------------------------------
.../impl/flatten/FlattenRecordBatch.java | 24 +++++--
.../exec/vector/complex/fn/JsonReader.java | 31 ++++++++-
.../vector/complex/writer/TestJsonReader.java | 73 +++++++++++++++++++-
3 files changed, 118 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/88655adf/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index bedf731..fc80b49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -27,6 +27,7 @@ import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
@@ -310,12 +311,23 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
final NamedExpression flattenExpr = new NamedExpression(popConfig.getColumn(), new FieldReference(popConfig.getColumn()));
final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression)ExpressionTreeMaterializer.materialize(flattenExpr.getExpr(), incoming, collector, context.getFunctionRegistry(), true);
- final TransferPair tp = getFlattenFieldTransferPair(flattenExpr.getRef());
-
- if (tp != null) {
- transfers.add(tp);
- container.add(tp.getTo());
- transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
+ final FieldReference fieldReference = flattenExpr.getRef();
+ final TransferPair transferPair = getFlattenFieldTransferPair(fieldReference);
+
+ if (transferPair != null) {
+ final ValueVector flattenVector = transferPair.getTo();
+
+ // checks that list has only default ValueVector and replaces resulting ValueVector to INT typed ValueVector
+ if (exprs.size() == 0 && flattenVector.getField().getType().equals(Types.LATE_BIND_TYPE)) {
+ final MaterializedField outputField = MaterializedField.create(fieldReference.getAsNamePart().getName(), Types.OPTIONAL_INT);
+ final ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
+
+ container.add(vector);
+ } else {
+ transfers.add(transferPair);
+ container.add(flattenVector);
+ transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
+ }
}
logger.debug("Added transfer for project expression.");
http://git-wip-us.apache.org/repos/asf/drill/blob/88655adf/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 1bc5eaa..9848ae7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -28,9 +28,7 @@ import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.store.easy.json.JsonProcessor.ReadState;
import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor;
-import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor.JsonExceptionProcessingState;
import org.apache.drill.exec.vector.complex.fn.VectorOutput.ListVectorOutput;
import org.apache.drill.exec.vector.complex.fn.VectorOutput.MapVectorOutput;
import org.apache.drill.exec.vector.complex.writer.BaseWriter;
@@ -59,6 +57,12 @@ public class JsonReader extends BaseJsonProcessor {
private final boolean readNumbersAsDouble;
/**
+ * Collection for tracking empty array writers during reading
+ * and storing them for initializing empty arrays
+ */
+ private final List<ListWriter> emptyArrayWriters = Lists.newArrayList();
+
+ /**
* Describes whether or not this reader can unwrap a single root array record
* and treat it like a set of distinct records.
*/
@@ -153,6 +157,17 @@ public class JsonReader extends BaseJsonProcessor {
}
}
}
+
+ for (ListWriter field : emptyArrayWriters) {
+ // checks that array has not been initialized
+ if (field.getValueCapacity() == 0) {
+ if (allTextMode) {
+ field.varChar();
+ } else {
+ field.integer();
+ }
+ }
+ }
}
public void setSource(int start, int end, DrillBuf buf) throws IOException {
@@ -544,6 +559,7 @@ public class JsonReader extends BaseJsonProcessor {
}
break;
case END_ARRAY:
+ addIfNotInitialized(list);
case END_OBJECT:
break outside;
@@ -591,6 +607,16 @@ public class JsonReader extends BaseJsonProcessor {
}
+ /**
+ * Checks that list has not been initialized and adds it to the emptyArrayWriters collection.
+ * @param list ListWriter that should be checked
+ */
+ private void addIfNotInitialized(ListWriter list) {
+ if (list.getValueCapacity() == 0) {
+ emptyArrayWriters.add(list);
+ }
+ }
+
private void writeDataAllText(ListWriter list) throws IOException {
list.startList();
outside: while (true) {
@@ -605,6 +631,7 @@ public class JsonReader extends BaseJsonProcessor {
}
break;
case END_ARRAY:
+ addIfNotInitialized(list);
case END_OBJECT:
break outside;
http://git-wip-us.apache.org/repos/asf/drill/blob/88655adf/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
index 1168e37..78c2c4c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
@@ -24,16 +24,16 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.BufferedOutputStream;
+import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
+import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;
import java.util.zip.GZIPOutputStream;
-import com.google.common.base.Joiner;
-
import org.apache.drill.BaseTestQuery;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.util.FileUtils;
@@ -652,4 +652,73 @@ public class TestJsonReader extends BaseTestQuery {
}
}
+ @Test
+ public void testFlattenEmptyArrayWithAllTextMode() throws Exception {
+ File path = new File(BaseTestQuery.getTempDir("json/input"));
+ path.mkdirs();
+ path.deleteOnExit();
+ String pathString = path.toPath().toString();
+
+ try (BufferedWriter writer = new BufferedWriter(new FileWriter(new File(path, "empty_array_all_text_mode.json")))) {
+ writer.write("{ \"a\": { \"b\": { \"c\": [] }, \"c\": [] } }");
+ }
+
+ try {
+ String query = String.format("select flatten(t.a.b.c) as c from dfs_test.`%s/empty_array_all_text_mode.json` t",
+ pathString);
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .optionSettingQueriesForTestQuery("alter session set `store.json.all_text_mode` = true")
+ .expectsEmptyResultSet()
+ .go();
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .optionSettingQueriesForTestQuery("alter session set `store.json.all_text_mode` = false")
+ .expectsEmptyResultSet()
+ .go();
+
+ } finally {
+ testNoResult("alter session reset `store.json.all_text_mode`");
+ }
+ }
+
+ @Test
+ public void testFlattenEmptyArrayWithUnionType() throws Exception {
+ File path = new File(BaseTestQuery.getTempDir("json/input"));
+ path.mkdirs();
+ path.deleteOnExit();
+ String pathString = path.toPath().toString();
+
+ try (BufferedWriter writer = new BufferedWriter(new FileWriter(new File(path, "empty_array.json")))) {
+ writer.write("{ \"a\": { \"b\": { \"c\": [] }, \"c\": [] } }");
+ }
+
+ try {
+ String query = String.format("select flatten(t.a.b.c) as c from dfs_test.`%s/empty_array.json` t",
+ pathString);
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+ .expectsEmptyResultSet()
+ .go();
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+ .optionSettingQueriesForTestQuery("alter session set `store.json.all_text_mode` = true")
+ .expectsEmptyResultSet()
+ .go();
+
+ } finally {
+ testNoResult("alter session reset `store.json.all_text_mode`");
+ testNoResult("alter session reset `exec.enable_union_type`");
+ }
+ }
}