You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by gp...@apache.org on 2019/09/08 03:00:12 UTC
[drill] 02/04: DRILL-7362: COUNT(*) on JSON with outer list results
in JsonParse error
This is an automated email from the ASF dual-hosted git repository.
gparai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 797524dd2df4a18ef17752aff501a9e099f50a93
Author: ozinoviev <oz...@solit-clouds.ru>
AuthorDate: Thu Aug 29 15:14:17 2019 +0300
DRILL-7362: COUNT(*) on JSON with outer list results in JsonParse error
closes #1849
---
.../store/easy/json/reader/BaseJsonReader.java | 167 +++++++++++++++++++++
.../store/easy/json/reader/CountingJsonReader.java | 47 ++----
.../drill/exec/vector/complex/fn/JsonReader.java | 126 ++--------------
.../exec/store/json/TestJsonRecordReader.java | 13 ++
4 files changed, 203 insertions(+), 150 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonReader.java
new file mode 100644
index 0000000..983aa9f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonReader.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.reader;
+
+import com.fasterxml.jackson.core.JsonToken;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Basic reader implementation for json documents.
+ */
+public abstract class BaseJsonReader extends BaseJsonProcessor {
+
+ private static final Logger logger = LoggerFactory.getLogger(BaseJsonReader.class);
+
+ /**
+ * Describes whether or not this reader can unwrap a single root array record
+ * and treat it like a set of distinct records.
+ */
+ private final boolean skipOuterList;
+
+ /**
+ * Whether the reader is currently in a situation where we are unwrapping an
+ * outer list.
+ */
+ private boolean inOuterList;
+
+ public BaseJsonReader(DrillBuf workBuf, boolean enableNanInf, boolean enableEscapeAnyChar, boolean skipOuterList) {
+ super(workBuf, enableNanInf, enableEscapeAnyChar);
+ this.skipOuterList = skipOuterList;
+ }
+
+ @Override
+ public ReadState write(ComplexWriter writer) throws IOException {
+
+ try {
+ JsonToken t = lastSeenJsonToken;
+ if (t == null || t == JsonToken.END_OBJECT) {
+ t = parser.nextToken();
+ }
+ while (!parser.hasCurrentToken() && !parser.isClosed()) {
+ t = parser.nextToken();
+ }
+ lastSeenJsonToken = null;
+
+ if (parser.isClosed()) {
+ return ReadState.END_OF_STREAM;
+ }
+
+ ReadState readState = writeToVector(writer, t);
+
+ switch (readState) {
+ case END_OF_STREAM:
+ case WRITE_SUCCEED:
+ return readState;
+ default:
+ throw getExceptionWithContext(UserException.dataReadError(), null).message(
+ "Failure while reading JSON. (Got an invalid read state %s )", readState.toString())
+ .build(logger);
+ }
+ } catch (com.fasterxml.jackson.core.JsonParseException ex) {
+ if (ignoreJSONParseError()) {
+ if (processJSONException() == JsonExceptionProcessingState.END_OF_STREAM) {
+ return ReadState.JSON_RECORD_PARSE_EOF_ERROR;
+ } else {
+ return ReadState.JSON_RECORD_PARSE_ERROR;
+ }
+ } else {
+ throw ex;
+ }
+ }
+ }
+
+
+ private ReadState writeToVector(ComplexWriter writer, JsonToken t)
+ throws IOException {
+
+ switch (t) {
+ case START_OBJECT:
+ writeDocument(writer, t);
+ break;
+ case START_ARRAY:
+ if (inOuterList) {
+ throw createDocumentTopLevelException();
+ }
+
+ if (skipOuterList) {
+ t = parser.nextToken();
+ if (t == JsonToken.START_OBJECT) {
+ inOuterList = true;
+ writeDocument(writer, t);
+ } else {
+ throw createDocumentTopLevelException();
+ }
+
+ } else {
+ writeDocument(writer, t);
+ }
+ break;
+ case END_ARRAY:
+
+ if (inOuterList) {
+ confirmLast();
+ return ReadState.END_OF_STREAM;
+ } else {
+ throw getExceptionWithContext(UserException.dataReadError(), null).message(
+ "Failure while parsing JSON. Ran across unexpected %s.", JsonToken.END_ARRAY).build(logger);
+ }
+
+ case NOT_AVAILABLE:
+ return ReadState.END_OF_STREAM;
+ default:
+ throw getExceptionWithContext(UserException.dataReadError(), null)
+ .message(
+ "Failure while parsing JSON. Found token of [%s]. Drill currently only supports parsing "
+ + "json strings that contain either lists or maps. The root object cannot be a scalar.",
+ t).build(logger);
+ }
+
+ return ReadState.WRITE_SUCCEED;
+ }
+
+ /**
+ * Writes the contents of the json node starting with the specified token into a complex vector.
+ * Token can take the following values:
+ * - START_ARRAY - the top level of json document is an array and skipping of the outer list is disabled
+ * - START_OBJECT - the top level of json document is a set of white space delimited maps
+ * or skipping of the outer list is enabled
+ */
+ protected abstract void writeDocument(ComplexWriter writer, JsonToken t) throws IOException;
+
+ protected UserException createDocumentTopLevelException() {
+ String message = "The top level of your document must either be a single array of maps or a set "
+ + "of white space delimited maps.";
+ return getExceptionWithContext(UserException.dataReadError(), message).build(logger);
+ }
+
+ private void confirmLast() throws IOException {
+ parser.nextToken();
+ if (!parser.isClosed()) {
+ String message = "Drill attempted to unwrap a toplevel list in your document. "
+ + "However, it appears that there is trailing content after this top level list. Drill only "
+ + "supports querying a set of distinct maps or a single json array with multiple inner maps.";
+ throw getExceptionWithContext(UserException.dataReadError(), message).build(logger);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
index 73b93f4..c9bcb0d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
@@ -23,50 +23,29 @@ import com.fasterxml.jackson.core.JsonToken;
import io.netty.buffer.DrillBuf;
-import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
-public class CountingJsonReader extends BaseJsonProcessor {
+public class CountingJsonReader extends BaseJsonReader {
public CountingJsonReader(DrillBuf workBuf, boolean enableNanInf, boolean enableEscapeAnyChar) {
- super(workBuf, enableNanInf, enableEscapeAnyChar);
+ super(workBuf, enableNanInf, enableEscapeAnyChar, true);
}
@Override
- public ReadState write(BaseWriter.ComplexWriter writer) throws IOException {
- try {
- JsonToken token = lastSeenJsonToken;
- if (token == null || token == JsonToken.END_OBJECT){
- token = parser.nextToken();
- }
- lastSeenJsonToken = null;
- if (token == JsonToken.FIELD_NAME) {
- currentFieldName = parser.getText();
- }
- if (!parser.hasCurrentToken()) {
- return ReadState.END_OF_STREAM;
- } else if (token != JsonToken.START_OBJECT) {
- throw new com.fasterxml.jackson.core.JsonParseException(
- parser, String.format("Cannot read from the middle of a record. Current token was %s ", token));
- }
- writer.rootAsMap().bit("count").writeBit(1);
- parser.skipChildren();
- } catch (com.fasterxml.jackson.core.JsonParseException ex) {
- if (ignoreJSONParseError()) {
- if (processJSONException() == JsonExceptionProcessingState.END_OF_STREAM){
- return ReadState.JSON_RECORD_PARSE_EOF_ERROR;
- }
- else{
- return ReadState.JSON_RECORD_PARSE_ERROR;
- }
- } else {
- throw ex;
- }
+ protected void writeDocument(ComplexWriter writer, JsonToken t) throws IOException {
+ switch (t) {
+ case START_OBJECT:
+ case START_ARRAY:
+ writer.rootAsMap().bit("count").writeBit(1);
+ parser.skipChildren();
+ break;
+ default:
+ throw createDocumentTopLevelException();
}
- return ReadState.WRITE_SUCCEED;
}
@Override
- public void ensureAtLeastOneField(BaseWriter.ComplexWriter writer) {
+ public void ensureAtLeastOneField(ComplexWriter writer) {
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 3426243..ec838d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -26,7 +26,7 @@ import java.util.List;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor;
+import org.apache.drill.exec.store.easy.json.reader.BaseJsonReader;
import org.apache.drill.exec.vector.complex.fn.VectorOutput.ListVectorOutput;
import org.apache.drill.exec.vector.complex.fn.VectorOutput.MapVectorOutput;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
@@ -40,7 +40,7 @@ import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-public class JsonReader extends BaseJsonProcessor {
+public class JsonReader extends BaseJsonReader {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
.getLogger(JsonReader.class);
public final static int MAX_RECORD_SIZE = 128 * 1024;
@@ -59,25 +59,12 @@ public class JsonReader extends BaseJsonProcessor {
*/
private final List<ListWriter> emptyArrayWriters = Lists.newArrayList();
- /**
- * Describes whether or not this reader can unwrap a single root array record
- * and treat it like a set of distinct records.
- */
- private final boolean skipOuterList;
-
- /**
- * Whether the reader is currently in a situation where we are unwrapping an
- * outer list.
- */
- private boolean inOuterList;
-
private FieldSelection selection;
private JsonReader(Builder builder) {
- super(builder.managedBuf, builder.enableNanInf, builder.enableEscapeAnyChar);
+ super(builder.managedBuf, builder.enableNanInf, builder.enableEscapeAnyChar, builder.skipOuterList);
selection = FieldSelection.getFieldSelection(builder.columns);
workingBuffer = builder.workingBuffer;
- skipOuterList = builder.skipOuterList;
allTextMode = builder.allTextMode;
columns = builder.columns;
mapOutput = builder.mapOutput;
@@ -184,110 +171,17 @@ public class JsonReader extends BaseJsonProcessor {
}
@Override
- public ReadState write(ComplexWriter writer) throws IOException {
-
- ReadState readState = null;
- try {
- JsonToken t = lastSeenJsonToken;
- if (t == null || t == JsonToken.END_OBJECT) {
- t = parser.nextToken();
- }
- while (!parser.hasCurrentToken() && !parser.isClosed()) {
- t = parser.nextToken();
- }
- lastSeenJsonToken = null;
-
- if (parser.isClosed()) {
- return ReadState.END_OF_STREAM;
- }
-
- readState = writeToVector(writer, t);
-
- switch (readState) {
- case END_OF_STREAM:
+ protected void writeDocument(ComplexWriter writer, JsonToken t) throws IOException {
+ switch (t) {
+ case START_OBJECT:
+ writeDataSwitch(writer.rootAsMap());
break;
- case WRITE_SUCCEED:
+ case START_ARRAY:
+ writeDataSwitch(writer.rootAsList());
break;
default:
- throw getExceptionWithContext(UserException.dataReadError(), null).message(
- "Failure while reading JSON. (Got an invalid read state %s )", readState.toString())
- .build(logger);
- }
- } catch (com.fasterxml.jackson.core.JsonParseException ex) {
- if (ignoreJSONParseError()) {
- if (processJSONException() == JsonExceptionProcessingState.END_OF_STREAM) {
- return ReadState.JSON_RECORD_PARSE_EOF_ERROR;
- } else {
- return ReadState.JSON_RECORD_PARSE_ERROR;
- }
- } else {
- throw ex;
- }
- }
- return readState;
- }
-
- private void confirmLast() throws IOException {
- parser.nextToken();
- if (!parser.isClosed()) {
- String message = "Drill attempted to unwrap a toplevel list in your document. "
- + "However, it appears that there is trailing content after this top level list. Drill only "
- + "supports querying a set of distinct maps or a single json array with multiple inner maps.";
- throw getExceptionWithContext(UserException.dataReadError(), message).build(logger);
- }
- }
-
- private ReadState writeToVector(ComplexWriter writer, JsonToken t)
- throws IOException {
-
- switch (t) {
- case START_OBJECT:
- writeDataSwitch(writer.rootAsMap());
- break;
- case START_ARRAY:
- if (inOuterList) {
- String message = "The top level of your document must either be a single array of maps or a set "
- + "of white space delimited maps.";
- throw getExceptionWithContext(UserException.dataReadError(), message).build(logger);
- }
-
- if (skipOuterList) {
- t = parser.nextToken();
- if (t == JsonToken.START_OBJECT) {
- inOuterList = true;
- writeDataSwitch(writer.rootAsMap());
- } else {
- String message = "The top level of your document must either be a single array of maps or a set "
- + "of white space delimited maps.";
- throw getExceptionWithContext(UserException.dataReadError(), message).build(logger);
- }
-
- } else {
- writeDataSwitch(writer.rootAsList());
- }
- break;
- case END_ARRAY:
-
- if (inOuterList) {
- confirmLast();
- return ReadState.END_OF_STREAM;
- } else {
- throw getExceptionWithContext(UserException.dataReadError(), null).message(
- "Failure while parsing JSON. Ran across unexpected %s.", JsonToken.END_ARRAY).build(logger);
- }
-
- case NOT_AVAILABLE:
- return ReadState.END_OF_STREAM;
- default:
- throw getExceptionWithContext(UserException.dataReadError(), null)
- .message(
- "Failure while parsing JSON. Found token of [%s]. Drill currently only supports parsing "
- + "json strings that contain either lists or maps. The root object cannot be a scalar.",
- t).build(logger);
+ throw createDocumentTopLevelException();
}
-
- return ReadState.WRITE_SUCCEED;
-
}
private void writeDataSwitch(MapWriter w) throws IOException {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
index 7a601eb..719c3a3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
@@ -261,4 +261,17 @@ public class TestJsonRecordReader extends BaseTestQuery {
}
throw new Exception("testNotCountingQueryNotSkippingInvalidJSONRecords");
}
+
+ @Test
+ @Category(UnlikelyTest.class)
+ // See DRILL-7362
+ /* Test for CountingJSONReader */
+ public void testContainingArrayCount() throws Exception {
+ testBuilder()
+ .sqlQuery("select count(*) as cnt from cp.`store/json/listdoc.json`")
+ .unOrdered()
+ .baselineColumns("cnt")
+ .baselineValues(2L)
+ .go();
+ }
}