You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by gp...@apache.org on 2019/09/08 03:00:12 UTC

[drill] 02/04: DRILL-7362: COUNT(*) on JSON with outer list results in JsonParse error

This is an automated email from the ASF dual-hosted git repository.

gparai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 797524dd2df4a18ef17752aff501a9e099f50a93
Author: ozinoviev <oz...@solit-clouds.ru>
AuthorDate: Thu Aug 29 15:14:17 2019 +0300

    DRILL-7362: COUNT(*) on JSON with outer list results in JsonParse error
    
    closes #1849
---
 .../store/easy/json/reader/BaseJsonReader.java     | 167 +++++++++++++++++++++
 .../store/easy/json/reader/CountingJsonReader.java |  47 ++----
 .../drill/exec/vector/complex/fn/JsonReader.java   | 126 ++--------------
 .../exec/store/json/TestJsonRecordReader.java      |  13 ++
 4 files changed, 203 insertions(+), 150 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonReader.java
new file mode 100644
index 0000000..983aa9f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonReader.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.reader;
+
+import com.fasterxml.jackson.core.JsonToken;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Basic reader implementation for json documents.
+ */
+public abstract class BaseJsonReader extends BaseJsonProcessor {
+
+  private static final Logger logger = LoggerFactory.getLogger(BaseJsonReader.class);
+
+  /**
+   * Describes whether or not this reader can unwrap a single root array record
+   * and treat it like a set of distinct records.
+   */
+  private final boolean skipOuterList;
+
+  /**
+   * Whether the reader is currently in a situation where we are unwrapping an
+   * outer list.
+   */
+  private boolean inOuterList;
+
+  public BaseJsonReader(DrillBuf workBuf, boolean enableNanInf, boolean enableEscapeAnyChar, boolean skipOuterList) {
+    super(workBuf, enableNanInf, enableEscapeAnyChar);
+    this.skipOuterList = skipOuterList;
+  }
+
+  @Override
+  public ReadState write(ComplexWriter writer) throws IOException {
+
+    try {
+      JsonToken t = lastSeenJsonToken;
+      if (t == null || t == JsonToken.END_OBJECT) {
+        t = parser.nextToken();
+      }
+      while (!parser.hasCurrentToken() && !parser.isClosed()) {
+        t = parser.nextToken();
+      }
+      lastSeenJsonToken = null;
+
+      if (parser.isClosed()) {
+        return ReadState.END_OF_STREAM;
+      }
+
+      ReadState readState = writeToVector(writer, t);
+
+      switch (readState) {
+        case END_OF_STREAM:
+        case WRITE_SUCCEED:
+          return readState;
+        default:
+          throw getExceptionWithContext(UserException.dataReadError(), null).message(
+            "Failure while reading JSON. (Got an invalid read state %s )", readState.toString())
+            .build(logger);
+      }
+    } catch (com.fasterxml.jackson.core.JsonParseException ex) {
+      if (ignoreJSONParseError()) {
+        if (processJSONException() == JsonExceptionProcessingState.END_OF_STREAM) {
+          return ReadState.JSON_RECORD_PARSE_EOF_ERROR;
+        } else {
+          return ReadState.JSON_RECORD_PARSE_ERROR;
+        }
+      } else {
+        throw ex;
+      }
+    }
+  }
+
+
+  private ReadState writeToVector(ComplexWriter writer, JsonToken t)
+    throws IOException {
+
+    switch (t) {
+      case START_OBJECT:
+        writeDocument(writer, t);
+        break;
+      case START_ARRAY:
+        if (inOuterList) {
+          throw createDocumentTopLevelException();
+        }
+
+        if (skipOuterList) {
+          t = parser.nextToken();
+          if (t == JsonToken.START_OBJECT) {
+            inOuterList = true;
+            writeDocument(writer, t);
+          } else {
+            throw createDocumentTopLevelException();
+          }
+
+        } else {
+          writeDocument(writer, t);
+        }
+        break;
+      case END_ARRAY:
+
+        if (inOuterList) {
+          confirmLast();
+          return ReadState.END_OF_STREAM;
+        } else {
+          throw getExceptionWithContext(UserException.dataReadError(), null).message(
+            "Failure while parsing JSON.  Ran across unexpected %s.", JsonToken.END_ARRAY).build(logger);
+        }
+
+      case NOT_AVAILABLE:
+        return ReadState.END_OF_STREAM;
+      default:
+        throw getExceptionWithContext(UserException.dataReadError(), null)
+          .message(
+            "Failure while parsing JSON.  Found token of [%s].  Drill currently only supports parsing "
+              + "json strings that contain either lists or maps.  The root object cannot be a scalar.",
+            t).build(logger);
+    }
+
+    return ReadState.WRITE_SUCCEED;
+  }
+
+  /**
+   * Writes the contents of the json node starting with the specified token into a complex vector.
+   * Token can take the following values:
+   * - START_ARRAY - the top level of json document is an array and skipping of the outer list is disabled
+   * - START_OBJECT - the top level of json document is a set of white space delimited maps
+   *                  or skipping of the outer list is enabled
+   */
+  protected abstract void writeDocument(ComplexWriter writer, JsonToken t) throws IOException;
+
+  protected UserException createDocumentTopLevelException() {
+    String message = "The top level of your document must either be a single array of maps or a set "
+      + "of white space delimited maps.";
+    return getExceptionWithContext(UserException.dataReadError(), message).build(logger);
+  }
+
+  private void confirmLast() throws IOException {
+    parser.nextToken();
+    if (!parser.isClosed()) {
+      String message = "Drill attempted to unwrap a toplevel list in your document. "
+        + "However, it appears that there is trailing content after this top level list.  Drill only "
+        + "supports querying a set of distinct maps or a single json array with multiple inner maps.";
+      throw getExceptionWithContext(UserException.dataReadError(), message).build(logger);
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
index 73b93f4..c9bcb0d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
@@ -23,50 +23,29 @@ import com.fasterxml.jackson.core.JsonToken;
 
 import io.netty.buffer.DrillBuf;
 
-import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
-public class CountingJsonReader extends BaseJsonProcessor {
+public class CountingJsonReader extends BaseJsonReader {
 
   public CountingJsonReader(DrillBuf workBuf, boolean enableNanInf, boolean enableEscapeAnyChar) {
-    super(workBuf, enableNanInf, enableEscapeAnyChar);
+    super(workBuf, enableNanInf, enableEscapeAnyChar, true);
   }
 
   @Override
-  public ReadState write(BaseWriter.ComplexWriter writer) throws IOException {
-    try {
-      JsonToken token = lastSeenJsonToken;
-      if (token == null || token == JsonToken.END_OBJECT){
-        token = parser.nextToken();
-      }
-      lastSeenJsonToken = null;
-      if (token == JsonToken.FIELD_NAME) {
-        currentFieldName = parser.getText();
-      }
-      if (!parser.hasCurrentToken()) {
-        return ReadState.END_OF_STREAM;
-      } else if (token != JsonToken.START_OBJECT) {
-        throw new com.fasterxml.jackson.core.JsonParseException(
-            parser, String.format("Cannot read from the middle of a record. Current token was %s ", token));
-      }
-      writer.rootAsMap().bit("count").writeBit(1);
-      parser.skipChildren();
-    } catch (com.fasterxml.jackson.core.JsonParseException ex) {
-      if (ignoreJSONParseError()) {
-        if (processJSONException() == JsonExceptionProcessingState.END_OF_STREAM){
-          return ReadState.JSON_RECORD_PARSE_EOF_ERROR;
-        }
-        else{
-          return ReadState.JSON_RECORD_PARSE_ERROR;
-        }
-      } else {
-        throw ex;
-      }
+  protected void writeDocument(ComplexWriter writer, JsonToken t) throws IOException {
+    switch (t) {
+      case START_OBJECT:
+      case START_ARRAY:
+        writer.rootAsMap().bit("count").writeBit(1);
+        parser.skipChildren();
+        break;
+      default:
+        throw createDocumentTopLevelException();
     }
-    return ReadState.WRITE_SUCCEED;
   }
 
   @Override
-  public void ensureAtLeastOneField(BaseWriter.ComplexWriter writer) {
+  public void ensureAtLeastOneField(ComplexWriter writer) {
 
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 3426243..ec838d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -26,7 +26,7 @@ import java.util.List;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor;
+import org.apache.drill.exec.store.easy.json.reader.BaseJsonReader;
 import org.apache.drill.exec.vector.complex.fn.VectorOutput.ListVectorOutput;
 import org.apache.drill.exec.vector.complex.fn.VectorOutput.MapVectorOutput;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
@@ -40,7 +40,7 @@ import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
-public class JsonReader extends BaseJsonProcessor {
+public class JsonReader extends BaseJsonReader {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
       .getLogger(JsonReader.class);
   public final static int MAX_RECORD_SIZE = 128 * 1024;
@@ -59,25 +59,12 @@ public class JsonReader extends BaseJsonProcessor {
    */
   private final List<ListWriter> emptyArrayWriters = Lists.newArrayList();
 
-  /**
-   * Describes whether or not this reader can unwrap a single root array record
-   * and treat it like a set of distinct records.
-   */
-  private final boolean skipOuterList;
-
-  /**
-   * Whether the reader is currently in a situation where we are unwrapping an
-   * outer list.
-   */
-  private boolean inOuterList;
-
   private FieldSelection selection;
 
   private JsonReader(Builder builder) {
-    super(builder.managedBuf, builder.enableNanInf, builder.enableEscapeAnyChar);
+    super(builder.managedBuf, builder.enableNanInf, builder.enableEscapeAnyChar, builder.skipOuterList);
     selection = FieldSelection.getFieldSelection(builder.columns);
     workingBuffer = builder.workingBuffer;
-    skipOuterList = builder.skipOuterList;
     allTextMode = builder.allTextMode;
     columns = builder.columns;
     mapOutput = builder.mapOutput;
@@ -184,110 +171,17 @@ public class JsonReader extends BaseJsonProcessor {
   }
 
   @Override
-  public ReadState write(ComplexWriter writer) throws IOException {
-
-    ReadState readState = null;
-    try {
-      JsonToken t = lastSeenJsonToken;
-      if (t == null || t == JsonToken.END_OBJECT) {
-        t = parser.nextToken();
-      }
-      while (!parser.hasCurrentToken() && !parser.isClosed()) {
-        t = parser.nextToken();
-      }
-      lastSeenJsonToken = null;
-
-      if (parser.isClosed()) {
-        return ReadState.END_OF_STREAM;
-      }
-
-      readState = writeToVector(writer, t);
-
-      switch (readState) {
-      case END_OF_STREAM:
+  protected void writeDocument(ComplexWriter writer, JsonToken t) throws IOException {
+    switch (t) {
+      case START_OBJECT:
+        writeDataSwitch(writer.rootAsMap());
         break;
-      case WRITE_SUCCEED:
+      case START_ARRAY:
+        writeDataSwitch(writer.rootAsList());
         break;
       default:
-        throw getExceptionWithContext(UserException.dataReadError(), null).message(
-            "Failure while reading JSON. (Got an invalid read state %s )", readState.toString())
-            .build(logger);
-      }
-    } catch (com.fasterxml.jackson.core.JsonParseException ex) {
-      if (ignoreJSONParseError()) {
-        if (processJSONException() == JsonExceptionProcessingState.END_OF_STREAM) {
-          return ReadState.JSON_RECORD_PARSE_EOF_ERROR;
-        } else {
-          return ReadState.JSON_RECORD_PARSE_ERROR;
-        }
-      } else {
-        throw ex;
-      }
-    }
-    return readState;
-  }
-
-  private void confirmLast() throws IOException {
-    parser.nextToken();
-    if (!parser.isClosed()) {
-      String message = "Drill attempted to unwrap a toplevel list in your document. "
-          + "However, it appears that there is trailing content after this top level list.  Drill only "
-          + "supports querying a set of distinct maps or a single json array with multiple inner maps.";
-      throw getExceptionWithContext(UserException.dataReadError(), message).build(logger);
-    }
-  }
-
-  private ReadState writeToVector(ComplexWriter writer, JsonToken t)
-      throws IOException {
-
-    switch (t) {
-    case START_OBJECT:
-      writeDataSwitch(writer.rootAsMap());
-      break;
-    case START_ARRAY:
-      if (inOuterList) {
-        String message = "The top level of your document must either be a single array of maps or a set "
-            + "of white space delimited maps.";
-        throw getExceptionWithContext(UserException.dataReadError(), message).build(logger);
-      }
-
-      if (skipOuterList) {
-        t = parser.nextToken();
-        if (t == JsonToken.START_OBJECT) {
-          inOuterList = true;
-          writeDataSwitch(writer.rootAsMap());
-        } else {
-          String message = "The top level of your document must either be a single array of maps or a set "
-              + "of white space delimited maps.";
-          throw getExceptionWithContext(UserException.dataReadError(), message).build(logger);
-        }
-
-      } else {
-        writeDataSwitch(writer.rootAsList());
-      }
-      break;
-    case END_ARRAY:
-
-      if (inOuterList) {
-        confirmLast();
-        return ReadState.END_OF_STREAM;
-      } else {
-        throw getExceptionWithContext(UserException.dataReadError(), null).message(
-            "Failure while parsing JSON.  Ran across unexpected %s.", JsonToken.END_ARRAY).build(logger);
-      }
-
-    case NOT_AVAILABLE:
-      return ReadState.END_OF_STREAM;
-    default:
-      throw getExceptionWithContext(UserException.dataReadError(), null)
-          .message(
-              "Failure while parsing JSON.  Found token of [%s].  Drill currently only supports parsing "
-                  + "json strings that contain either lists or maps.  The root object cannot be a scalar.",
-              t).build(logger);
+        throw createDocumentTopLevelException();
     }
-
-    return ReadState.WRITE_SUCCEED;
-
   }
 
   private void writeDataSwitch(MapWriter w) throws IOException {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
index 7a601eb..719c3a3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
@@ -261,4 +261,17 @@ public class TestJsonRecordReader extends BaseTestQuery {
     }
     throw new Exception("testNotCountingQueryNotSkippingInvalidJSONRecords");
   }
+
+  @Test
+  @Category(UnlikelyTest.class)
+  // See DRILL-7362
+  /* Test for CountingJSONReader */
+  public void testContainingArrayCount() throws Exception {
+    testBuilder()
+      .sqlQuery("select count(*) as cnt from cp.`store/json/listdoc.json`")
+      .unOrdered()
+      .baselineColumns("cnt")
+      .baselineValues(2L)
+      .go();
+  }
 }