You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/09/11 08:02:50 UTC

[GitHub] [druid] FrankChen021 opened a new pull request #10383: Fix ingestion failure of pretty-formatted JSON message

FrankChen021 opened a new pull request #10383:
URL: https://github.com/apache/druid/pull/10383


   <!-- Thanks for trying to help us make Apache Druid be the best it can be! Please fill out as much of the following information as is possible (where relevant, and remove it when irrelevant) to help make the intention and scope of this PR clear in order to ease review. -->
   
   This PR fixes #10259 . 
   
   ### Description
   
   The cause and solution has been discussed completely in the issue. 
   
   In this PR, parent class of `JsonReader` is changed from `TextReader` to `InputEntityReader` to handle both pretty formatted JSON text and line-delimited JSON based on the function provided by `ObjectMapper.readValues`
   
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are corner cases and error conditions handled, such as when there are insufficient resources?
    - Class organization and design (how the logic is split between classes, inheritance, composition, design patterns)
    - Method organization and design (how the logic is split between methods, parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative name) for every design (or naming) decision point and compare the alternatives with the designs that you've implemented (or the names you've chosen) to highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in the development mailing list), link to that discussion from this PR description and explain what have changed in your final design compared to your original proposal or the consensus version in the end of the discussion. If something hasn't changed since the original discussion, you can omit a detailed discussion of those aspects of the design here, perhaps apart from brief mentioning for the sake of readability of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small changes. -->
   
   <hr>
   
   This PR has:
   - [X] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [X] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [X] been tested in a test Druid cluster.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-690976487






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm edited a comment on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
gianm edited a comment on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691239531






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r520018811



##########
File path: extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
##########
@@ -2726,4 +2726,83 @@ public void close()
         null
     );
   }
+
+  @Test(timeout = 60_000L)
+  public void testMultipleLinesJSONText() throws Exception
+  {
+    reportParseExceptions = false;
+    maxParseExceptions = 1000;
+    maxSavedParseExceptions = 2;
+
+    // Insert data
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
+
+      //multiple objects in one Kafka record will yield 2 rows in druid
+      String wellformed = toJsonString(true, "2049", "d2", "y", "10", "22.0", "2.0") +
+                     toJsonString(true, "2049", "d3", "y", "10", "23.0", "3.0");
+
+      //multiple objects in one Kafka record but some objects are in ill-formed format
+      //the whole ProducerRecord will be discarded
+      String illformed = "{\"timestamp\":2049, \"dim1\": \"d4\", \"dim2\":\"x\", \"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":\"2.0\" }" +
+                     "{\"timestamp\":2049, \"dim1\": \"d5\", \"dim2\":\"y\", \"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":invalidFormat }" +
+                     "{\"timestamp\":2049, \"dim1\": \"d6\", \"dim2\":\"z\", \"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":\"3.0\" }";
+
+      ProducerRecord[] producerRecords = new ProducerRecord[]{
+          // pretty formatted
+          new ProducerRecord<>(topic, 0, null, jb(true, "2049", "d1", "y", "10", "20.0", "1.0")),
+          //well-formed
+          new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8(wellformed)),
+          //ill-formed
+          new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("illformed")),

Review comment:
       Did you intend `StringUtils.toUtf8(illformed)`? The variable `illformed` is not in use.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
##########
@@ -111,38 +113,49 @@ public SamplerResponse sample(
     try (final CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample();
          final IncrementalIndex<Aggregator> index = buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema);
          final Closer closer1 = closer) {
-      SamplerResponseRow[] responseRows = new SamplerResponseRow[nonNullSamplerConfig.getNumRows()];
-      int counter = 0, numRowsIndexed = 0;
-
-      while (counter < responseRows.length && iterator.hasNext()) {
-        Map<String, Object> rawColumns = null;
-        try {
-          final InputRowListPlusRawValues inputRowListPlusRawValues = iterator.next();
-
-          if (inputRowListPlusRawValues.getRawValues() != null) {
-            rawColumns = inputRowListPlusRawValues.getRawValues();
-          }
-
-          if (inputRowListPlusRawValues.getParseException() != null) {
-            throw inputRowListPlusRawValues.getParseException();
+      List<SamplerResponseRow> responseRows = new ArrayList<>(nonNullSamplerConfig.getNumRows());
+      int numRowsIndexed = 0;
+
+      while (responseRows.size() < nonNullSamplerConfig.getNumRows() && iterator.hasNext()) {
+        final InputRowListPlusRawValues inputRowListPlusRawValues = iterator.next();
+
+        final List<Map<String, Object>> rawColumnsList = inputRowListPlusRawValues.getRawValuesList();
+
+        final ParseException parseException = inputRowListPlusRawValues.getParseException();
+        if (parseException != null) {
+          if (rawColumnsList != null) {
+            // add all rows to response
+            responseRows.addAll(rawColumnsList.stream()
+                                              .map(rawColumns -> new SamplerResponseRow(rawColumns, null, true, parseException.getMessage()))
+                                              .collect(Collectors.toList()));

Review comment:
       nit: with this change, when a parseException is thrown while parsing a list of rows in one message, those rows in the message will be added as separate rows in `responseRows` but with the same parseException. This can be confusing since the parseException error message seem irrelevant to the associated `rawColumns` in the same `responseRow`. IMO, the better fix in this case would be storing the whole `rawColumnsList` in one `SamplerResponseRow`. Then, the parseException can indicate that it was thrown while parsing one of the rows in `rawColumnsList`. However, this requires a change on the web console side as well. I'm OK with fixing this in a follow-up PR.

##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
##########
@@ -1060,6 +1062,107 @@ public void testWithFilter() throws IOException
     );
   }
 
+  @Test
+  public void testIndexParseException() throws IOException

Review comment:
       Thanks for adding this test! Similar to https://github.com/apache/druid/pull/10383/files#diff-ef25ac1cc1f275b47b939b65e1d0c8b8b8512aeada52d06b8541b8f381df03eeR2731, could you please add a unit test for sampling a block of multiple JSON strings? The unit test can be run only when `parserType` is `STR_JSON`. [We usually just return in the unit test when the parameter is not what we want to test with](https://github.com/apache/druid/blob/master/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java#L6812-L6814). It would be nice if the test verifies the followings:
   - The sampler response when there is no parseException with a list of multiple JSON strings.
   - The sampler response when there is a parseException thrown while parsing a list of multiple JSON strings. Maybe you can improve this unit test to do it as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-727141902


   > I'm merging this PR. Thanks @FrankChen021!
   
   I appreciate your review and comments very much!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r494728667



##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -33,13 +40,98 @@
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
-public class JsonReader extends TextReader
+/**
+ * <pre>
+ * In constract to {@link JsonLineReader} which processes input text line by line independently,
+ * this class tries to parse the input text as a whole to an array of objects.
+ *
+ * The input text can be:
+ * 1. a JSON string of an object in a line or multiple lines(such as pretty-printed JSON text)
+ * 2. multiple JSON object strings concated by white space character(s)
+ *
+ * For case 2, what should be noticed is that if an exception is thrown when parsing one JSON string,
+ * the rest JSON text will all be ignored
+ *
+ * For more information, see: https://github.com/apache/druid/pull/10383
+ * </pre>
+ */
+public class JsonReader implements InputEntityReader

Review comment:
       > The sampler currently assumes that there is only one JSON object in an input chunk which could have either an array or a nested object. 
   
   That's the root cause why `ExceptionThrowingIterator` is extracted and `JsonReader` inherits from InputEntityReader directly.
   
   Your suggestion provides a new and simple way to deal with it. I'll test the code later.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -33,13 +40,98 @@
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
-public class JsonReader extends TextReader
+/**
+ * <pre>
+ * In constract to {@link JsonLineReader} which processes input text line by line independently,
+ * this class tries to parse the input text as a whole to an array of objects.
+ *
+ * The input text can be:
+ * 1. a JSON string of an object in a line or multiple lines(such as pretty-printed JSON text)
+ * 2. multiple JSON object strings concated by white space character(s)
+ *
+ * For case 2, what should be noticed is that if an exception is thrown when parsing one JSON string,
+ * the rest JSON text will all be ignored
+ *
+ * For more information, see: https://github.com/apache/druid/pull/10383
+ * </pre>
+ */
+public class JsonReader implements InputEntityReader

Review comment:
       > The sampler currently assumes that there is only one JSON object in an input chunk which could have either an array or a nested object. 
   
   That's the root cause why `ExceptionThrowingIterator` is extracted and `JsonReader` inherits from `InputEntityReader` directly.
   
   Your suggestion provides a new and simple way to deal with it. I'll test the code later.

##########
File path: core/src/main/java/org/apache/druid/data/input/ExceptionThrowingIterator.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+public class ExceptionThrowingIterator<T> implements CloseableIterator<T>
+{
+  private final RuntimeException exception;
+
+  private boolean thrown = false;
+
+  public ExceptionThrowingIterator(Throwable exception)
+  {
+    this.exception = exception instanceof RuntimeException
+                     ? (RuntimeException) exception
+                     : new RuntimeException(exception);
+  }
+
+  @Override
+  public boolean hasNext()
+  {
+    return !thrown;
+  }
+
+  @Override
+  public T next()
+  {
+    thrown = true;

Review comment:
       I don't know why SpotBugs didn't report the problem before this class is extracted. But if we adopt the solution that makes `JsonReader` inherit `IntermediateRowParsingReader` as you suggest, this modification should be rollback and  I'll check it again if the report of this bug is still there




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r503772217



##########
File path: core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -96,23 +101,45 @@ public void close() throws IOException
   public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
   {
     return intermediateRowIterator().map(row -> {
-      final Map<String, Object> rawColumns;
+
+      final List<Map<String, Object>> rawColumnsList;
       try {
-        rawColumns = toMap(row);
+        rawColumnsList = toMap(row);
       }
       catch (Exception e) {
-        return InputRowListPlusRawValues.of(null, new ParseException(e, "Unable to parse row [%s] into JSON", row));
+        return Collections.singletonList(InputRowListPlusRawValues.of(null,
+                                                                      new ParseException(e, "Unable to parse row [%s] into JSON", row)));
+      }
+
+      if (CollectionUtils.isNullOrEmpty(rawColumnsList)) {
+        return Collections.singletonList(InputRowListPlusRawValues.of(null,
+                                                                      new ParseException("No map object parsed for row [%s]", row)));
       }
+
+      List<InputRow> rows;
       try {
-        return InputRowListPlusRawValues.of(parseInputRows(row), rawColumns);
+        rows = parseInputRows(row);
       }
       catch (ParseException e) {
-        return InputRowListPlusRawValues.of(rawColumns, e);
+        return rawColumnsList.stream().map(rawColumn -> InputRowListPlusRawValues.of(rawColumn, e)).collect(Collectors.toList());
       }
       catch (IOException e) {
-        return InputRowListPlusRawValues.of(rawColumns, new ParseException(e, "Unable to parse row [%s] into inputRow", row));
+        ParseException exception = new ParseException(e, "Unable to parse row [%s] into inputRow", row);
+        return rawColumnsList.stream().map(rawColumn -> InputRowListPlusRawValues.of(rawColumn, exception)).collect(Collectors.toList());
+      }

Review comment:
       I agree with you. 
   
   `rawValues` in `InputRowListPlusRawValues` should also be a list to correspond the list of `inputRows`. Since the `rawValues` is also passed to `SamplerResponseRow` when exception occurs, to change `rawValues` from `Map` to `List<Map>` also involves changes of the web console. I don't know if the change of front-end should also be included in this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r520435602



##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
##########
@@ -1060,6 +1062,107 @@ public void testWithFilter() throws IOException
     );
   }
 
+  @Test
+  public void testIndexParseException() throws IOException

Review comment:
       I've added a test [`testMultipleJsonStringInOneBlock`](https://github.com/apache/druid/pull/10383/files#diff-b4114cca5866803474d2b6ec21a1d566b90eab5d4a31d4f051b93cd8883914ebR1187),  please check it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r521857100



##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
##########
@@ -1163,6 +1172,121 @@ public void testIndexParseException() throws IOException
     );
   }
 
+  /**
+   *
+   * This case tests sampling for multiple json lines in one text block
+   * Currently only RecordSupplierInputSource supports this kind of input, see https://github.com/apache/druid/pull/10383 for more information
+   *
+   * This test combines illegal json block and legal json block together to verify:
+   * 1. all lines in the illegal json block should not be parsed
+   * 2. the illegal json block should not affect the processing of the 2nd record
+   * 3. all lines in legal json block should be parsed successfully
+   *
+   */
+  @Test
+  public void testMultipleJsonStringInOneBlock() throws IOException
+  {
+    if (!ParserType.STR_JSON.equals(parserType) || !useInputFormatApi) {
+      return;
+    }
+
+    final TimestampSpec timestampSpec = new TimestampSpec("t", null, null);
+    final DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        ImmutableList.of(StringDimensionSchema.create("dim1PlusBar"))
+    );
+    final TransformSpec transformSpec = new TransformSpec(
+        null,
+        ImmutableList.of(new ExpressionTransform("dim1PlusBar", "concat(dim1 + 'bar')", TestExprMacroTable.INSTANCE))
+    );
+    final AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
+    final GranularitySpec granularitySpec = new UniformGranularitySpec(
+        Granularities.DAY,
+        Granularities.HOUR,
+        true,
+        null
+    );
+    final DataSchema dataSchema = createDataSchema(
+        timestampSpec,
+        dimensionsSpec,
+        aggregatorFactories,
+        granularitySpec,
+        transformSpec
+    );
+
+    List<String> jsonBlockList = ImmutableList.of(
+        // include the line which can't be parsed into JSON object to form a illegal json block
+        String.join("", STR_JSON_ROWS),
+
+        // exclude the last line to form a legal json block
+        String.join("", STR_JSON_ROWS.stream().limit(STR_JSON_ROWS.size() - 1).collect(Collectors.toList()))

Review comment:
       > Every time I push a commit to a branch that is being merge, I run the test cases in the module which contains the changes in that commit. If it's OK, I'll push the commit. But I find that there's a high probability that CI fails. Sometime it's related to inspection check, sometimes it's caused by failures of test cases in other modules, sometime it's about dependency check, sometime it has something with license check.
   > 
   > I wonder what steps do you follow to check before push a commit ? Do you run all the cases in all modules ? Or is there a simple way to run the checks mentioned above ?
   
   @FrankChen021 That's what I usually do as well. To be honest, unexpected CI failures make me annoyed too :sweat_smile: You can run those checks on your own by running the same command as what Travis runs. You may want to set up some pre-commit/post-commit hooks. The best would be some automatic correction for trivial issues, but I'm not sure if there is such a tool available which is matured and reliable enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson merged pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson merged pull request #10383:
URL: https://github.com/apache/druid/pull/10383


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm edited a comment on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
gianm edited a comment on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691239531






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r520275473



##########
File path: extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
##########
@@ -2726,4 +2726,83 @@ public void close()
         null
     );
   }
+
+  @Test(timeout = 60_000L)
+  public void testMultipleLinesJSONText() throws Exception
+  {
+    reportParseExceptions = false;
+    maxParseExceptions = 1000;
+    maxSavedParseExceptions = 2;
+
+    // Insert data
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
+
+      //multiple objects in one Kafka record will yield 2 rows in druid
+      String wellformed = toJsonString(true, "2049", "d2", "y", "10", "22.0", "2.0") +
+                     toJsonString(true, "2049", "d3", "y", "10", "23.0", "3.0");
+
+      //multiple objects in one Kafka record but some objects are in ill-formed format
+      //the whole ProducerRecord will be discarded
+      String illformed = "{\"timestamp\":2049, \"dim1\": \"d4\", \"dim2\":\"x\", \"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":\"2.0\" }" +
+                     "{\"timestamp\":2049, \"dim1\": \"d5\", \"dim2\":\"y\", \"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":invalidFormat }" +
+                     "{\"timestamp\":2049, \"dim1\": \"d6\", \"dim2\":\"z\", \"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":\"3.0\" }";
+
+      ProducerRecord[] producerRecords = new ProducerRecord[]{
+          // pretty formatted
+          new ProducerRecord<>(topic, 0, null, jb(true, "2049", "d1", "y", "10", "20.0", "1.0")),
+          //well-formed
+          new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8(wellformed)),
+          //ill-formed
+          new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("illformed")),

Review comment:
       Yes, `illformed` here should be a variable instead of a text string.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
suneet-s commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691219861


   @FrankChen021 I think we should add integration tests for this. Consider updating the data files in `integration-tests/src/test/resources/data/batch_index/json/` to include multi-line json
   
   The CI failures look legit. I re-triggered the failing integration test since it looked like it might have been flaky.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r502776914



##########
File path: core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -96,23 +99,35 @@ public void close() throws IOException
   public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
   {
     return intermediateRowIterator().map(row -> {
-      final Map<String, Object> rawColumns;
+
+      final List<Map<String, Object>> rawColumnsList;
       try {
-        rawColumns = toMap(row);
+        rawColumnsList = toMap(row);
       }
       catch (Exception e) {
-        return InputRowListPlusRawValues.of(null, new ParseException(e, "Unable to parse row [%s] into JSON", row));
+        return Collections.singletonList(InputRowListPlusRawValues.of(null,
+                                                                      new ParseException(e, "Unable to parse row [%s] into JSON", row)));
       }
+
+      List<InputRow> rows;
       try {
-        return InputRowListPlusRawValues.of(parseInputRows(row), rawColumns);
+        rows = parseInputRows(row);
       }
       catch (ParseException e) {
-        return InputRowListPlusRawValues.of(rawColumns, e);
+        return Collections.singletonList(InputRowListPlusRawValues.of(rawColumnsList.isEmpty() ? null : rawColumnsList.get(0), e));

Review comment:
       It's been fixed. Now it's ready for final review. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-699862612


   @jihoonson The patch has been submitted. The change of signature of `IntermediateRowParsingReader#toMap` involves changes on all existing subclasses, which are reported insufficient branch or line coverage by CI.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691353141






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r502079779



##########
File path: core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -96,23 +99,35 @@ public void close() throws IOException
   public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
   {
     return intermediateRowIterator().map(row -> {
-      final Map<String, Object> rawColumns;
+
+      final List<Map<String, Object>> rawColumnsList;
       try {
-        rawColumns = toMap(row);
+        rawColumnsList = toMap(row);
       }
       catch (Exception e) {
-        return InputRowListPlusRawValues.of(null, new ParseException(e, "Unable to parse row [%s] into JSON", row));
+        return Collections.singletonList(InputRowListPlusRawValues.of(null,
+                                                                      new ParseException(e, "Unable to parse row [%s] into JSON", row)));
       }
+
+      List<InputRow> rows;
       try {
-        return InputRowListPlusRawValues.of(parseInputRows(row), rawColumns);
+        rows = parseInputRows(row);
       }
       catch (ParseException e) {
-        return InputRowListPlusRawValues.of(rawColumns, e);
+        return Collections.singletonList(InputRowListPlusRawValues.of(rawColumnsList.isEmpty() ? null : rawColumnsList.get(0), e));

Review comment:
       There are some test failing in `InputSourceSamplerTest`. It seems like because of this change.

##########
File path: core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -96,23 +99,35 @@ public void close() throws IOException
   public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
   {
     return intermediateRowIterator().map(row -> {
-      final Map<String, Object> rawColumns;
+
+      final List<Map<String, Object>> rawColumnsList;
       try {
-        rawColumns = toMap(row);
+        rawColumnsList = toMap(row);
       }
       catch (Exception e) {
-        return InputRowListPlusRawValues.of(null, new ParseException(e, "Unable to parse row [%s] into JSON", row));
+        return Collections.singletonList(InputRowListPlusRawValues.of(null,
+                                                                      new ParseException(e, "Unable to parse row [%s] into JSON", row)));
       }
+
+      List<InputRow> rows;
       try {
-        return InputRowListPlusRawValues.of(parseInputRows(row), rawColumns);
+        rows = parseInputRows(row);
       }
       catch (ParseException e) {
-        return InputRowListPlusRawValues.of(rawColumns, e);
+        return Collections.singletonList(InputRowListPlusRawValues.of(rawColumnsList.isEmpty() ? null : rawColumnsList.get(0), e));

Review comment:
       There are some tests failing in `InputSourceSamplerTest`. It seems like because of this change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r487371583



##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +55,99 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
-  @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
-  {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
-  }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  public CloseableIterator<InputRow> read() throws IOException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
+    final MappingIterator<JsonNode> delegate = mapper.readValues(
+        new JsonFactory().createParser(this.source.open()),

Review comment:
       The `parser` created from `JsonFactory` is `Closeable` which should be closed when the below `CloseableIterator` is closed. 

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +55,99 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
-  @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
-  {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
-  }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  public CloseableIterator<InputRow> read() throws IOException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
+    final MappingIterator<JsonNode> delegate = mapper.readValues(
+        new JsonFactory().createParser(this.source.open()),

Review comment:
       I haven't looked through the PR yet, but found that the `parser` created from `JsonFactory` is `Closeable` which should be closed when the below `CloseableIterator` is closed. I will review again when you update the PR.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +55,99 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
-  @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
-  {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
-  }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  public CloseableIterator<InputRow> read() throws IOException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
+    final MappingIterator<JsonNode> delegate = mapper.readValues(
+        new JsonFactory().createParser(this.source.open()),

Review comment:
       The `parser` created from `JsonFactory` is `Closeable` which should be closed when the below `CloseableIterator` is closed. 

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +55,99 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
-  @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
-  {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
-  }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  public CloseableIterator<InputRow> read() throws IOException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
+    final MappingIterator<JsonNode> delegate = mapper.readValues(
+        new JsonFactory().createParser(this.source.open()),

Review comment:
       I haven't looked through the PR yet, but found that the `parser` created from `JsonFactory` is `Closeable` which should be closed when the below `CloseableIterator` is closed. I will review again when you update the PR.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +55,99 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
-  @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
-  {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
-  }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  public CloseableIterator<InputRow> read() throws IOException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
+    final MappingIterator<JsonNode> delegate = mapper.readValues(
+        new JsonFactory().createParser(this.source.open()),

Review comment:
       The `parser` created from `JsonFactory` is `Closeable` which should be closed when the below `CloseableIterator` is closed. 

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +55,99 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
-  @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
-  {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
-  }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  public CloseableIterator<InputRow> read() throws IOException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
+    final MappingIterator<JsonNode> delegate = mapper.readValues(
+        new JsonFactory().createParser(this.source.open()),

Review comment:
       I haven't looked through the PR yet, but found that the `parser` created from `JsonFactory` is `Closeable` which should be closed when the below `CloseableIterator` is closed. I will review again when you update the PR.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +55,99 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
-  @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
-  {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
-  }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  public CloseableIterator<InputRow> read() throws IOException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
+    final MappingIterator<JsonNode> delegate = mapper.readValues(
+        new JsonFactory().createParser(this.source.open()),

Review comment:
       The `parser` created from `JsonFactory` is `Closeable` which should be closed when the below `CloseableIterator` is closed. 

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +55,99 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
-  @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
-  {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
-  }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  public CloseableIterator<InputRow> read() throws IOException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
+    final MappingIterator<JsonNode> delegate = mapper.readValues(
+        new JsonFactory().createParser(this.source.open()),

Review comment:
       I haven't looked through the PR yet, but found that the `parser` created from `JsonFactory` is `Closeable` which should be closed when the below `CloseableIterator` is closed. I will review again when you update the PR.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +55,99 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
-  @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
-  {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
-  }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  public CloseableIterator<InputRow> read() throws IOException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
+    final MappingIterator<JsonNode> delegate = mapper.readValues(
+        new JsonFactory().createParser(this.source.open()),

Review comment:
       The `parser` created from `JsonFactory` is `Closeable` which should be closed when the below `CloseableIterator` is closed. 

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +55,99 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
-  @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
-  {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
-  }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  public CloseableIterator<InputRow> read() throws IOException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
+    final MappingIterator<JsonNode> delegate = mapper.readValues(
+        new JsonFactory().createParser(this.source.open()),

Review comment:
       I haven't looked through the PR yet, but found that the `parser` created from `JsonFactory` is `Closeable` which should be closed when the below `CloseableIterator` is closed. I will review again when you update the PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r500971602



##########
File path: core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java
##########
@@ -69,7 +69,7 @@ public void testEquals()
               new ObjectMapper(),
               new ObjectMapper()
               )
-              .withIgnoredFields("objectMapper")
+              .withIgnoredFields("objectMapper", "lineSplittable")

Review comment:
       The new property `lineSplittable` is not declared as `final` and `EqualsVerifier` here will report a warning that `equals depends on mutable field`, so the new commit calls `suppress` method on `EqualsVerifier` to suppress this warning.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r520347805



##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
##########
@@ -1060,6 +1062,107 @@ public void testWithFilter() throws IOException
     );
   }
 
+  @Test
+  public void testIndexParseException() throws IOException

Review comment:
       > I think we need such tests because `KafkaIndexTaskTest` only covers the actual indexing side, but not the sampling side. Without those tests, we could break the sampler without being noticed.
   
   I forgot that test cases in `KafkaIndexTaskTest` only cover the indexing process. It's OK for me to add such tests. 
   
   BTW, integration tests failed, I don't think it's related to changes in this PR, and the failure can also be seen in my other PR, is there any other problem ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r521259660



##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
##########
@@ -1163,6 +1172,121 @@ public void testIndexParseException() throws IOException
     );
   }
 
+  /**
+   *
+   * This case tests sampling for multiple json lines in one text block
+   * Currently only RecordSupplierInputSource supports this kind of input, see https://github.com/apache/druid/pull/10383 for more information
+   *
+   * This test combines illegal json block and legal json block together to verify:
+   * 1. all lines in the illegal json block should not be parsed
+   * 2. the illegal json block should not affect the processing of the 2nd record
+   * 3. all lines in legal json block should be parsed successfully
+   *
+   */
+  @Test
+  public void testMultipleJsonStringInOneBlock() throws IOException
+  {
+    if (!ParserType.STR_JSON.equals(parserType) || !useInputFormatApi) {
+      return;
+    }
+
+    final TimestampSpec timestampSpec = new TimestampSpec("t", null, null);
+    final DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        ImmutableList.of(StringDimensionSchema.create("dim1PlusBar"))
+    );
+    final TransformSpec transformSpec = new TransformSpec(
+        null,
+        ImmutableList.of(new ExpressionTransform("dim1PlusBar", "concat(dim1 + 'bar')", TestExprMacroTable.INSTANCE))
+    );
+    final AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
+    final GranularitySpec granularitySpec = new UniformGranularitySpec(
+        Granularities.DAY,
+        Granularities.HOUR,
+        true,
+        null
+    );
+    final DataSchema dataSchema = createDataSchema(
+        timestampSpec,
+        dimensionsSpec,
+        aggregatorFactories,
+        granularitySpec,
+        transformSpec
+    );
+
+    List<String> jsonBlockList = ImmutableList.of(
+        // include the line which can't be parsed into JSON object to form a illegal json block
+        String.join("", STR_JSON_ROWS),
+
+        // exclude the last line to form a legal json block
+        String.join("", STR_JSON_ROWS.stream().limit(STR_JSON_ROWS.size() - 1).collect(Collectors.toList()))

Review comment:
       Hi @suneet-s , do you have any comments ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm edited a comment on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
gianm edited a comment on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691239531


   > I don't know whether jackson provides anyway to skip the ill-formed text, but I guess there's no such API for a parser.
   > So what's your suggestion on this problem ? @gianm @jihoonson
   
   Hmm, this is definitely a problem. When we're reading JSON from a file, we should be skipping the "lines" that aren't parseable, and marking them as unparseable. I guess that ObjectMapper API was too good to be true 🙂
   
   Maybe we can do one of the following two things.
   
   Option 1: After an error we can manually skip to the line that starts with `{` and begin parsing there. Because the JsonParser has a buffer, we need to make sure we don't miss buffered but unparsed content. We might be able to do that with the `JsonParser.releaseBuffered` method, assuming it works properly after an error.
   
   Option 2: Introduce a `lineSplittable` parameter for the JsonReader like you originally suggested, but it would behave differently in two ways. First: it wouldn't be available on the JsonInputFormat. Instead, it would be automatically set to `true` for batch ingestion and `false` for streaming ingestion. Second: `false` doesn't mean "not line splittable", it just means "we should auto detect". In the `true` case (batch ingestion), the JsonReader will split on lines and parse each line individually. Batch ingestion won't support pretty-printed input, but it never did in the past, and it doesn't seem likely that it's a common requirement anyway. In the `false` case (streaming ingestion), the JsonReader would use the ObjectMapper.readValues API, and it would support all these different kinds of payloads. But if there's an error, it would reject the entire payload. That all-or-nothing behavior seems OK for a streaming ingestion use case.
   
   IMO Option 2 sounds nicer, since it doesn't involve doing weird stuff with the ObjectReader APIs to try to skip unparseable data.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r502122105



##########
File path: core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java
##########
@@ -69,7 +69,7 @@ public void testEquals()
               new ObjectMapper(),
               new ObjectMapper()
               )
-              .withIgnoredFields("objectMapper")
+              .withIgnoredFields("objectMapper", "lineSplittable")

Review comment:
       Good idea!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r487371583



##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +55,99 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
-  @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
-  {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
-  }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  public CloseableIterator<InputRow> read() throws IOException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
+    final MappingIterator<JsonNode> delegate = mapper.readValues(
+        new JsonFactory().createParser(this.source.open()),

Review comment:
       I haven't looked through the PR yet, but found that the `parser` created from `JsonFactory` is `Closeable` which should be closed when the below `CloseableIterator` is closed. I will review again when you update the PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r504295781



##########
File path: core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -96,23 +101,45 @@ public void close() throws IOException
   public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
   {
     return intermediateRowIterator().map(row -> {
-      final Map<String, Object> rawColumns;
+
+      final List<Map<String, Object>> rawColumnsList;
       try {
-        rawColumns = toMap(row);
+        rawColumnsList = toMap(row);
       }
       catch (Exception e) {
-        return InputRowListPlusRawValues.of(null, new ParseException(e, "Unable to parse row [%s] into JSON", row));
+        return Collections.singletonList(InputRowListPlusRawValues.of(null,
+                                                                      new ParseException(e, "Unable to parse row [%s] into JSON", row)));
+      }
+
+      if (CollectionUtils.isNullOrEmpty(rawColumnsList)) {
+        return Collections.singletonList(InputRowListPlusRawValues.of(null,
+                                                                      new ParseException("No map object parsed for row [%s]", row)));
       }
+
+      List<InputRow> rows;
       try {
-        return InputRowListPlusRawValues.of(parseInputRows(row), rawColumns);
+        rows = parseInputRows(row);
       }
       catch (ParseException e) {
-        return InputRowListPlusRawValues.of(rawColumns, e);
+        return rawColumnsList.stream().map(rawColumn -> InputRowListPlusRawValues.of(rawColumn, e)).collect(Collectors.toList());
       }
       catch (IOException e) {
-        return InputRowListPlusRawValues.of(rawColumns, new ParseException(e, "Unable to parse row [%s] into inputRow", row));
+        ParseException exception = new ParseException(e, "Unable to parse row [%s] into inputRow", row);
+        return rawColumnsList.stream().map(rawColumn -> InputRowListPlusRawValues.of(rawColumn, exception)).collect(Collectors.toList());
+      }

Review comment:
       Good point. I'm not sure if we have a unit test for parsing sampler response. If we have, the CI will fail and we need to fix it in this PR together. Otherwise, I'm OK with doing the fix in a follow-up. @vogievetsky do you know whether or not we have such a unit test?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r500970256



##########
File path: core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -96,23 +99,35 @@ public void close() throws IOException
   public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
   {
     return intermediateRowIterator().map(row -> {
-      final Map<String, Object> rawColumns;
+
+      final List<Map<String, Object>> rawColumnsList;
       try {
-        rawColumns = toMap(row);
+        rawColumnsList = toMap(row);
       }
       catch (Exception e) {
-        return InputRowListPlusRawValues.of(null, new ParseException(e, "Unable to parse row [%s] into JSON", row));
+        return Collections.singletonList(InputRowListPlusRawValues.of(null,
+                                                                      new ParseException(e, "Unable to parse row [%s] into JSON", row)));
       }
+
+      List<InputRow> rows;
       try {
-        return InputRowListPlusRawValues.of(parseInputRows(row), rawColumns);
+        rows = parseInputRows(row);
       }
       catch (ParseException e) {
-        return InputRowListPlusRawValues.of(rawColumns, e);
+        return Collections.singletonList(InputRowListPlusRawValues.of(rawColumnsList.isEmpty() ? null : rawColumnsList.get(0), e));

Review comment:
       To return the entire `rawColumnsList` here involves changes on `InputRowListPlusRawValues` which are heavily used by test cases. 
   
   I checked the code again and find that there's no need to return raw column object when exception occurs. Because `InputSourceSample` re-throws exception once `InputRowListPlusRawValues.getParseException` returns exception.  
   
   So the new commit passes a `null` to `InputRowListPlusRawValues` to eliminate this strange code here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r521857100



##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
##########
@@ -1163,6 +1172,121 @@ public void testIndexParseException() throws IOException
     );
   }
 
+  /**
+   *
+   * This case tests sampling for multiple json lines in one text block
+   * Currently only RecordSupplierInputSource supports this kind of input, see https://github.com/apache/druid/pull/10383 for more information
+   *
+   * This test combines illegal json block and legal json block together to verify:
+   * 1. all lines in the illegal json block should not be parsed
+   * 2. the illegal json block should not affect the processing of the 2nd record
+   * 3. all lines in legal json block should be parsed successfully
+   *
+   */
+  @Test
+  public void testMultipleJsonStringInOneBlock() throws IOException
+  {
+    if (!ParserType.STR_JSON.equals(parserType) || !useInputFormatApi) {
+      return;
+    }
+
+    final TimestampSpec timestampSpec = new TimestampSpec("t", null, null);
+    final DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        ImmutableList.of(StringDimensionSchema.create("dim1PlusBar"))
+    );
+    final TransformSpec transformSpec = new TransformSpec(
+        null,
+        ImmutableList.of(new ExpressionTransform("dim1PlusBar", "concat(dim1 + 'bar')", TestExprMacroTable.INSTANCE))
+    );
+    final AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
+    final GranularitySpec granularitySpec = new UniformGranularitySpec(
+        Granularities.DAY,
+        Granularities.HOUR,
+        true,
+        null
+    );
+    final DataSchema dataSchema = createDataSchema(
+        timestampSpec,
+        dimensionsSpec,
+        aggregatorFactories,
+        granularitySpec,
+        transformSpec
+    );
+
+    List<String> jsonBlockList = ImmutableList.of(
+        // include the line which can't be parsed into JSON object to form a illegal json block
+        String.join("", STR_JSON_ROWS),
+
+        // exclude the last line to form a legal json block
+        String.join("", STR_JSON_ROWS.stream().limit(STR_JSON_ROWS.size() - 1).collect(Collectors.toList()))

Review comment:
       > Every time I push a commit to a branch that is being merge, I run the test cases in the module which contains the changes in that commit. If it's OK, I'll push the commit. But I find that there's a high probability that CI fails. Sometime it's related to inspection check, sometimes it's caused by failures of test cases in other modules, sometime it's about dependency check, sometime it has something with license check.
   > 
   > I wonder what steps do you follow to check before push a commit ? Do you run all the cases in all modules ? Or is there a simple way to run the checks mentioned above ?
   
   That's what I usually do as well. To be honest, unexpected CI failures make me annoyed too :sweat_smile: You can run those checks on your own by running the same command as what Travis runs. You may want to set up some pre-commit/post-commit hooks. The best would be some automatic correction for trivial issues, but I'm not sure if there is such a tool available which is matured and reliable enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r512533944



##########
File path: core/src/main/java/org/apache/druid/data/input/InputRowListPlusRawValues.java
##########
@@ -82,8 +121,16 @@ private InputRowListPlusRawValues(
     return inputRows;
   }
 
+  /**
+   * This method is left here only for test cases
+   */
   @Nullable
   public Map<String, Object> getRawValues()
+  {
+    return CollectionUtils.isNullOrEmpty(rawValues) ? null : rawValues.get(0);

Review comment:
       But `Iterables.getOnlyElement` does not check whether the given input is null or not. If the given input is null, there will be a NPE.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-717536861


   @FrankChen021 thanks. I left my last comments. Everything else looks good to me.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r491620988



##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
##########
@@ -41,6 +42,20 @@
   private final ObjectMapper objectMapper;
   private final boolean keepNullColumns;
 
+  /**
+   * <pre>
+   * This parameter is introduced to support json string of an object in multiple lines in streaming ingestion records
+   *
+   * It indicates whether the input text can be splitted into lines in first, which will then be parsed into JSON objects one by one independently.

Review comment:
       typo: splitted -> split.

##########
File path: core/src/main/java/org/apache/druid/data/input/ExceptionThrowingIterator.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+public class ExceptionThrowingIterator<T> implements CloseableIterator<T>

Review comment:
       As this class is extracted now, please add some Javaodc describing what this class does and where it is used.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -33,13 +40,98 @@
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
-public class JsonReader extends TextReader
+/**
+ * <pre>
+ * In constract to {@link JsonLineReader} which processes input text line by line independently,
+ * this class tries to parse the input text as a whole to an array of objects.
+ *
+ * The input text can be:
+ * 1. a JSON string of an object in a line or multiple lines(such as pretty-printed JSON text)
+ * 2. multiple JSON object strings concated by white space character(s)
+ *
+ * For case 2, what should be noticed is that if an exception is thrown when parsing one JSON string,
+ * the rest JSON text will all be ignored
+ *
+ * For more information, see: https://github.com/apache/druid/pull/10383
+ * </pre>
+ */
+public class JsonReader implements InputEntityReader

Review comment:
       Thinking about this new reader, I think the new requirement for parsing multiple JSON objects to multiple rows doesn't fit in the current interface of the sampler. The sampler currently assumes that _there is only one JSON object in an input chunk_ which could have either an array or a nested object. That means, the current interface allows multiple out rows as `inputRows` is a list of `InputRow`s in `InputRowListPlusRawValues`, but doesn't allow multiple values in an input chunk as `rawValues` is a map in `InputRowListPlusRawValues`. To support the requirement, I think `rawValues` should be a list of map, so that all raw values in an input chunk can be returned to the sampler.
   
   If we do this, `JsonReader` can simply extend `IntermediateRowParsingReader`, where the intermediate row will be the [`inputText` created from `source.open()`](https://github.com/apache/druid/pull/10383/files#diff-cc591396000acab135e9bd4aa1f31e2aR72). Here is an example.
   
   ```java
   public class JsonReader2 extends IntermediateRowParsingReader<String>
   {
     private final ObjectFlattener<JsonNode> flattener;
     private final ObjectMapper mapper;
     private final InputEntity source;
     private final InputRowSchema inputRowSchema;
   
     JsonReader2(
         InputRowSchema inputRowSchema,
         InputEntity source,
         JSONPathSpec flattenSpec,
         ObjectMapper mapper,
         boolean keepNullColumns
     )
     {
       this.inputRowSchema = inputRowSchema;
       this.source = source;
       this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
       this.mapper = mapper;
     }
   
     @Override
     protected CloseableIterator<String> intermediateRowIterator() throws IOException
     {
       return CloseableIterators.withEmptyBaggage(
           Iterators.singletonIterator(IOUtils.toString(source.open(), StringUtils.UTF8_STRING))
       );
     }
   
     @Override
     protected List<InputRow> parseInputRows(String intermediateRow) throws IOException, ParseException
     {
       try (JsonParser parser = new JsonFactory().createParser(intermediateRow)) {
         final Iterator<JsonNode> delegate = mapper.readValues(parser, JsonNode.class);
         return FluentIterable.from(() -> delegate)
                              .transform(jsonNode -> MapInputRowParser.parse(inputRowSchema, flattener.flatten(jsonNode)))
                              .toList();
       }
     }
   
     @Override
     protected List<Map<String, Object>> toMap(String intermediateRow) throws IOException
     {
       try (JsonParser parser = new JsonFactory().createParser(intermediateRow)) {
         final Iterator<Map> delegate = mapper.readValues(parser, Map.class);
         return FluentIterable.from(() -> delegate).transform(map -> (Map<String, Object>) map).toList();
       }
     }
   }
   ```
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   

##########
File path: core/src/main/java/org/apache/druid/data/input/ExceptionThrowingIterator.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+public class ExceptionThrowingIterator<T> implements CloseableIterator<T>

Review comment:
       If `JsonReader` extends `IntermediateRowParsingReader`, this class will not have to be extracted.

##########
File path: core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java
##########
@@ -126,138 +149,158 @@ public void testParseRowWithConditional() throws IOException
     try (CloseableIterator<InputRow> iterator = reader.read()) {
       int numActualIterations = 0;
       while (iterator.hasNext()) {
+
         final InputRow row = iterator.next();
-        Assert.assertEquals("test", Iterables.getOnlyElement(row.getDimension("bar")));
-        Assert.assertEquals(Collections.emptyList(), row.getDimension("foo"));
-        Assert.assertTrue(row.getDimension("baz").isEmpty());
-        numActualIterations++;
-      }
-      Assert.assertEquals(numExpectedIterations, numActualIterations);
-    }
-  }
 
-  @Test
-  public void testParseRowKeepNullColumns() throws IOException
-  {
-    final JsonInputFormat format = new JsonInputFormat(
-        new JSONPathSpec(
-            true,
-            ImmutableList.of(
-                new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg")
-            )
-        ),
-        null,
-        true
-    );
+        Assert.assertEquals(DateTimes.of("2019-01-01"), row.getTimestamp());
+        Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
+        Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
+        Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
+        Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
+        Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
 
-    final ByteEntity source = new ByteEntity(
-        StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"o\":{\"mg\":null}}")
-    );
+        Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
+        Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
+        Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
 
-    final InputEntityReader reader = format.createReader(
-        new InputRowSchema(
-            new TimestampSpec("timestamp", "iso", null),
-            new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Collections.emptyList())),
-            Collections.emptyList()
-        ),
-        source,
-        null
-    );
-    final int numExpectedIterations = 1;
-    try (CloseableIterator<InputRow> iterator = reader.read()) {
-      int numActualIterations = 0;
-      while (iterator.hasNext()) {
-        final InputRow row = iterator.next();
-        Assert.assertEquals(Arrays.asList("path_omg", "timestamp", "bar", "foo"), row.getDimensions());
-        Assert.assertTrue(row.getDimension("bar").isEmpty());
-        Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
-        Assert.assertTrue(row.getDimension("path_omg").isEmpty());
         numActualIterations++;
       }
+
       Assert.assertEquals(numExpectedIterations, numActualIterations);
     }
   }
 
   @Test
-  public void testKeepNullColumnsWithNoNullValues() throws IOException
+  public void testInvalidJSONText() throws IOException
   {
     final JsonInputFormat format = new JsonInputFormat(
         new JSONPathSpec(
             true,
             ImmutableList.of(
-                new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg")
+                new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"),
+                new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"),
+                new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"),
+                new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"),
+                new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
+                new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
             )
         ),
         null,
-        true
+        null
     );
 
+    //make sure JsonReader is used
+    format.setLineSplittable(false);
+
     final ByteEntity source = new ByteEntity(
-        StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":1,\"foo\":\"x\",\"o\":{\"mg\":\"a\"}}")
+        StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}"
+                           + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}" //baz property is illegal
+                           + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}")
     );
 
     final InputEntityReader reader = format.createReader(
         new InputRowSchema(
             new TimestampSpec("timestamp", "iso", null),
-            new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Collections.emptyList())),
+            new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
             Collections.emptyList()
         ),
         source,
         null
     );
+
+    // the 2nd line is ill-formed, it stops to iterate to the 3rd line.
+    // So in total, only 1 lines has been parsed
     final int numExpectedIterations = 1;
+
     try (CloseableIterator<InputRow> iterator = reader.read()) {
       int numActualIterations = 0;
       while (iterator.hasNext()) {
-        final InputRow row = iterator.next();
-        Assert.assertEquals(Arrays.asList("path_omg", "timestamp", "bar", "foo"), row.getDimensions());
-        Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("bar")));
-        Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
-        Assert.assertEquals("a", Iterables.getOnlyElement(row.getDimension("path_omg")));
-        numActualIterations++;
+
+        try {
+          final InputRow row = iterator.next();
+
+          final String msgId = String.valueOf(++numActualIterations);
+          Assert.assertEquals(DateTimes.of("2019-01-01"), row.getTimestamp());
+          Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
+          Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
+          Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
+          Assert.assertEquals(msgId, Iterables.getOnlyElement(row.getDimension("path_omg")));
+          Assert.assertEquals(msgId, Iterables.getOnlyElement(row.getDimension("jq_omg")));
+
+          Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
+          Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
+          Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
+        }
+        catch (Exception e) {
+          //ignore the exception when parsing the 2nd

Review comment:
       Please verify that `e` is the exception what we expect.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
##########
@@ -41,6 +42,20 @@
   private final ObjectMapper objectMapper;
   private final boolean keepNullColumns;
 
+  /**
+   * <pre>

Review comment:
       The `pre` tag is a tag for pre-formatted text such as source codes. I think you don't have to use it in this case.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -33,13 +40,98 @@
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
-public class JsonReader extends TextReader
+/**
+ * <pre>

Review comment:
       Same comment here. You don't need a `pre` tag.

##########
File path: core/src/main/java/org/apache/druid/data/input/ExceptionThrowingIterator.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+public class ExceptionThrowingIterator<T> implements CloseableIterator<T>
+{
+  private final RuntimeException exception;
+
+  private boolean thrown = false;
+
+  public ExceptionThrowingIterator(Throwable exception)
+  {
+    this.exception = exception instanceof RuntimeException
+                     ? (RuntimeException) exception
+                     : new RuntimeException(exception);
+  }
+
+  @Override
+  public boolean hasNext()
+  {
+    return !thrown;
+  }
+
+  @Override
+  public T next()
+  {
+    thrown = true;

Review comment:
       Hmm, SpotBugs thinks this is not good since it cannot throw `NoSuchElementException` which is wrong since `next()` never can be called more than once. But, let's make it happy by simply adding `if (!haxNext()) throws `NoSuchElementException` here.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
##########
@@ -73,7 +88,14 @@ public boolean isSplittable()
   @Override
   public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
   {
-    return new JsonReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns);
+    return this.lineSplittable ?
+           new JsonLineReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns) :
+           new JsonReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns);
+  }
+
+  public void setLineSplittable(boolean lineSplittable)

Review comment:
       This doesn't look pretty, but simple enough to handle this exceptional case. Maybe we can introduce another layer on top of inputFormat to make it more pretty, but I don't think it will be worth at least at this point.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
##########
@@ -41,6 +42,20 @@
   private final ObjectMapper objectMapper;
   private final boolean keepNullColumns;
 
+  /**
+   * <pre>

Review comment:
       Hmm, I think the Javadoc should be more clear on what this flag means. How about rephrase it as below?
   
   ```
   This parameter indicates whether or not the given InputEntity should be split by lines before parsing it. If it is set to true, the InputEntity must be split by lines first. If it is set to false, unlike what you could imagine, it means that the InputEntity doesn't have to be split by lines first, but it can still contain multiple lines. A created InputEntityReader from this format will determine by itself if line splitting is necessary.
   
   This parameter should always be true for batch ingestion and false for streaming ingestion. For more information, see: https://github.com/apache/druid/pull/10383.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691403590


   @FrankChen021 it sounds good to me :slightly_smiling_face: 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r487371583



##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +55,99 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
-  @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
-  {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
-  }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  public CloseableIterator<InputRow> read() throws IOException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
+    final MappingIterator<JsonNode> delegate = mapper.readValues(
+        new JsonFactory().createParser(this.source.open()),

Review comment:
       The `parser` created from `JsonFactory` is `Closeable` which should be closed when the below `CloseableIterator` is closed. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
suneet-s commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691219861






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on pull request #10383: [WIP]Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-694798148


   @jihoonson I intended to do that, but after some deep investigation of original implementation, I found that it was not the root cause of previous problem. As a result, exception handling in the new `JsonReader.sample` was improved, as the way it did in `IntermediateRowParsingReader`,  to eliminate the failure of test cases. So neither test cases nor sample and ingestion behaviors were modified to adapt to this PR.
   
   The PR is now ready for review. I would like hear your voices with regarding to the pass of value of `lineSplittable` parameter,  which I think there should be a more graceful way.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r520354156



##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
##########
@@ -1060,6 +1062,107 @@ public void testWithFilter() throws IOException
     );
   }
 
+  @Test
+  public void testIndexParseException() throws IOException

Review comment:
       Thanks for understanding. Do you want to add those tests in this PR? For the integration test failure, I don’t think it’s relevant to the changes in this PR. I just retriggered it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r521083639



##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
##########
@@ -1163,6 +1172,121 @@ public void testIndexParseException() throws IOException
     );
   }
 
+  /**
+   *
+   * This case tests sampling for multiple json lines in one text block
+   * Currently only RecordSupplierInputSource supports this kind of input, see https://github.com/apache/druid/pull/10383 for more information
+   *
+   * This test combines illegal json block and legal json block together to verify:
+   * 1. all lines in the illegal json block should not be parsed
+   * 2. the illegal json block should not affect the processing of the 2nd record
+   * 3. all lines in legal json block should be parsed successfully
+   *
+   */
+  @Test
+  public void testMultipleJsonStringInOneBlock() throws IOException
+  {
+    if (!ParserType.STR_JSON.equals(parserType) || !useInputFormatApi) {
+      return;
+    }
+
+    final TimestampSpec timestampSpec = new TimestampSpec("t", null, null);
+    final DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        ImmutableList.of(StringDimensionSchema.create("dim1PlusBar"))
+    );
+    final TransformSpec transformSpec = new TransformSpec(
+        null,
+        ImmutableList.of(new ExpressionTransform("dim1PlusBar", "concat(dim1 + 'bar')", TestExprMacroTable.INSTANCE))
+    );
+    final AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
+    final GranularitySpec granularitySpec = new UniformGranularitySpec(
+        Granularities.DAY,
+        Granularities.HOUR,
+        true,
+        null
+    );
+    final DataSchema dataSchema = createDataSchema(
+        timestampSpec,
+        dimensionsSpec,
+        aggregatorFactories,
+        granularitySpec,
+        transformSpec
+    );
+
+    List<String> jsonBlockList = ImmutableList.of(
+        // include the line which can't be parsed into JSON object to form a illegal json block
+        String.join("", STR_JSON_ROWS),
+
+        // exclude the last line to form a legal json block
+        String.join("", STR_JSON_ROWS.stream().limit(STR_JSON_ROWS.size() - 1).collect(Collectors.toList()))

Review comment:
       Re-triggered LGTM




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r520286004



##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
##########
@@ -1060,6 +1062,107 @@ public void testWithFilter() throws IOException
     );
   }
 
+  @Test
+  public void testIndexParseException() throws IOException

Review comment:
       I think we need such tests because `KafkaIndexTaskTest` only covers the actual indexing side, but not the sampling side. Without those tests, we could break the sampler without being noticed.
   
   > For test cases for `InputSourceSampler` here, the input source is `InlineInputSource` which always uses `InputEntityIteratingReader` even when input format is `JsonInputFormat`, there's no chance to test that. So I think we don't need add test cases here.
   
   Maybe this is not a good place for the new tests, but you don't have to use the `InlineInputSource` for them. Rather, you should use `RecordSupplierInputSource` as you mentioned. Since `RecordSupplierInputSource` accepts the `RecordSupplier` interface as a parameter which is the data supplier, I think you could implement a mock for testing (I don't think we should test `Firehose` as it's deprecated). Or do you see some reason you cannot? If it's hard, I'm also OK with adding them in a follow-up.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r502122105



##########
File path: core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java
##########
@@ -69,7 +69,7 @@ public void testEquals()
               new ObjectMapper(),
               new ObjectMapper()
               )
-              .withIgnoredFields("objectMapper")
+              .withIgnoredFields("objectMapper", "lineSplittable")

Review comment:
       Good idea!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r521001837



##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
##########
@@ -1163,6 +1172,121 @@ public void testIndexParseException() throws IOException
     );
   }
 
+  /**
+   *
+   * This case tests sampling for multiple json lines in one text block
+   * Currently only RecordSupplierInputSource supports this kind of input, see https://github.com/apache/druid/pull/10383 for more information
+   *
+   * This test combines illegal json block and legal json block together to verify:
+   * 1. all lines in the illegal json block should not be parsed
+   * 2. the illegal json block should not affect the processing of the 2nd record
+   * 3. all lines in legal json block should be parsed successfully
+   *
+   */
+  @Test
+  public void testMultipleJsonStringInOneBlock() throws IOException
+  {
+    if (!ParserType.STR_JSON.equals(parserType) || !useInputFormatApi) {
+      return;
+    }
+
+    final TimestampSpec timestampSpec = new TimestampSpec("t", null, null);
+    final DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        ImmutableList.of(StringDimensionSchema.create("dim1PlusBar"))
+    );
+    final TransformSpec transformSpec = new TransformSpec(
+        null,
+        ImmutableList.of(new ExpressionTransform("dim1PlusBar", "concat(dim1 + 'bar')", TestExprMacroTable.INSTANCE))
+    );
+    final AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
+    final GranularitySpec granularitySpec = new UniformGranularitySpec(
+        Granularities.DAY,
+        Granularities.HOUR,
+        true,
+        null
+    );
+    final DataSchema dataSchema = createDataSchema(
+        timestampSpec,
+        dimensionsSpec,
+        aggregatorFactories,
+        granularitySpec,
+        transformSpec
+    );
+
+    List<String> jsonBlockList = ImmutableList.of(
+        // include the line which can't be parsed into JSON object to form a illegal json block
+        String.join("", STR_JSON_ROWS),
+
+        // exclude the last line to form a legal json block
+        String.join("", STR_JSON_ROWS.stream().limit(STR_JSON_ROWS.size() - 1).collect(Collectors.toList()))

Review comment:
       Every time I push a commit to a branch that is being merge, I run the test cases in the module which contains the changes in that commit. If it's OK, I'll push the commit. But I find that there's a high probability that CI fails. Sometime it's related to inspection check, sometimes it's caused by failures of test cases in other modules, sometime it's about dependency check, sometime it has something with license check. 
   
   I wonder what steps do you follow to check before push a commit ? Do you run all the cases in all modules ? Or is there any simple ways to run the checks mentioned above ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r521001837



##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
##########
@@ -1163,6 +1172,121 @@ public void testIndexParseException() throws IOException
     );
   }
 
+  /**
+   *
+   * This case tests sampling for multiple json lines in one text block
+   * Currently only RecordSupplierInputSource supports this kind of input, see https://github.com/apache/druid/pull/10383 for more information
+   *
+   * This test combines illegal json block and legal json block together to verify:
+   * 1. all lines in the illegal json block should not be parsed
+   * 2. the illegal json block should not affect the processing of the 2nd record
+   * 3. all lines in legal json block should be parsed successfully
+   *
+   */
+  @Test
+  public void testMultipleJsonStringInOneBlock() throws IOException
+  {
+    if (!ParserType.STR_JSON.equals(parserType) || !useInputFormatApi) {
+      return;
+    }
+
+    final TimestampSpec timestampSpec = new TimestampSpec("t", null, null);
+    final DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        ImmutableList.of(StringDimensionSchema.create("dim1PlusBar"))
+    );
+    final TransformSpec transformSpec = new TransformSpec(
+        null,
+        ImmutableList.of(new ExpressionTransform("dim1PlusBar", "concat(dim1 + 'bar')", TestExprMacroTable.INSTANCE))
+    );
+    final AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
+    final GranularitySpec granularitySpec = new UniformGranularitySpec(
+        Granularities.DAY,
+        Granularities.HOUR,
+        true,
+        null
+    );
+    final DataSchema dataSchema = createDataSchema(
+        timestampSpec,
+        dimensionsSpec,
+        aggregatorFactories,
+        granularitySpec,
+        transformSpec
+    );
+
+    List<String> jsonBlockList = ImmutableList.of(
+        // include the line which can't be parsed into JSON object to form a illegal json block
+        String.join("", STR_JSON_ROWS),
+
+        // exclude the last line to form a legal json block
+        String.join("", STR_JSON_ROWS.stream().limit(STR_JSON_ROWS.size() - 1).collect(Collectors.toList()))

Review comment:
       Every time I push a commit to a branch that is being merge, I run the test cases in the module which contains the changes in that commit. If it's OK, I'll push the commit. But I find that there's a high probability that CI fails. Sometime it's related to inspection check, sometimes it's caused by failures of test cases in other modules, sometime it's about dependency check, sometime it has something with license check. 
   
   I wonder what steps do you follow to check before push a commit ? Do you run all the cases in all modules ? Or is there a simple way to run the checks mentioned above ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-690976487






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-727056771


   I'm merging this PR. Thanks @FrankChen021!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
gianm commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691239531


   > I don't know whether jackson provides anyway to skip the ill-formed text, but I guess there's no such API for a parser.
   > So what's your suggestion on this problem ? @gianm @jihoonson
   
   Hmm, this is definitely a problem. When we're reading JSON from a file, we should be skipping the "lines" that aren't parseable, and marking them as unparseable. I guess that ObjectMapper API was too good to be true 🙂
   
   Maybe we can do one of the following two things.
   
   Option 1: After an error we can manually skip to the line that starts with `{` and begin parsing there. Because the JsonParser has a buffer, we need to make sure we don't miss buffered but unparsed content. We might be able to do that with the `JsonParser.releaseBuffered` method, assuming it works properly after an error.
   
   Option 2: Introduce a `lineSplittable` parameter for the JsonReader like you originally suggested, but it would behave differently in two ways. First: it wouldn't be available on the JsonInputFormat. Instead, it would be automatically set to `true` for batch ingestion and `false` for streaming ingestion. Second: `false` doesn't mean "not line splittable", it just means "we should auto detect". In the `true` case (batch ingestion), the JsonReader will split on lines and parse each line individually. Batch ingestion won't support pretty-printed input, but it never did in the past, and it doesn't seem likely that it's a common requirement anyway. In the `false` case (streaming ingestion), the JsonReader would use the ObjectMapper.readValues API, and it would support all these different kinds of payloads. But if there's an error, it would reject the entire payload. That all-or-nothing behavior seems OK for a streaming ingestion use case.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 edited a comment on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 edited a comment on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691185618






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
gianm commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691239531






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r512544300



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
##########
@@ -131,17 +130,27 @@ public SamplerResponse sample(
             continue;
           }
 
-          for (InputRow row : inputRowListPlusRawValues.getInputRows()) {
-            index.add(new SamplerInputRow(row, counter), true);
+          for (int i = 0; i < rawColumnsList.size(); i++) {
+            Map<String, Object> rawColumns = rawColumnsList.get(i);
+            InputRow row = inputRowListPlusRawValues.getInputRows().get(i);
+
+            //keep the index of the row to be added to responseRows for further use
+            final int rowIndex = responseRows.size();
+            index.add(new SamplerInputRow(row, rowIndex), true);
+
             // store the raw value; will be merged with the data from the IncrementalIndex later
-            responseRows[counter] = new SamplerResponseRow(rawColumns, null, null, null);
-            counter++;
+            responseRows.add(new SamplerResponseRow(rawColumns, null, null, null));
             numRowsIndexed++;
           }
         }
         catch (ParseException e) {
-          responseRows[counter] = new SamplerResponseRow(rawColumns, null, true, e.getMessage());
-          counter++;
+          if (rawColumnsList != null) {
+            responseRows.addAll(rawColumnsList.stream()
+                                              .map(rawColumns -> new SamplerResponseRow(rawColumns, null, true, e.getMessage()))
+                                              .collect(Collectors.toList()));

Review comment:
       I notice that `index.add` does not throw `ParseException` but returns the exception in its returning result. The previous version of `InputSourceSampler` checks the result to rethrow this exception, while the [change](https://github.com/apache/druid/pull/10336/files#diff-a35198e960bc90b96d559e9ea93be1d4de52655e9a4f04016173114952ae0001) introduced by #10336 deletes these exception related code, which means the code here won't throw ParseException.
   
   Of course, there's possibility that the code in for-loop throws `ParseException` in future. I've made little changes to avoid this potential situation to happen. Let's check it once the CI passes. 
   
   BTW, my local branch has failed to build since last time it was rebased on master, I have to reply on the CI to check if there're any test case failures. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-690976487






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-709996561


   This pull request **introduces 1 alert** when merging fd34e1d0f6b762ba6cab93b966ccbe3ef1a79374 into 3538abd5d064d11a03cea6458782e5f4b52d3c45 - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-490305dc895654701985c7b9d3a995a7b554e192)
   
   **new alerts:**
   
   * 1 for Dereferenced variable may be null


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-719448312


   This pull request **introduces 1 alert** when merging c1157d2f13f8cff3e3539fb64555c2eb1cba3c21 into 9c51047cc8f1aa49b77bbfc7a1f0cc14df691a6b - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-c22a90cb7d47bae48d6bc964378e41d971265d33)
   
   **new alerts:**
   
   * 1 for Dereferenced variable may be null


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
suneet-s commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691219861






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r521079741



##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
##########
@@ -1163,6 +1172,121 @@ public void testIndexParseException() throws IOException
     );
   }
 
+  /**
+   *
+   * This case tests sampling for multiple json lines in one text block
+   * Currently only RecordSupplierInputSource supports this kind of input, see https://github.com/apache/druid/pull/10383 for more information
+   *
+   * This test combines illegal json block and legal json block together to verify:
+   * 1. all lines in the illegal json block should not be parsed
+   * 2. the illegal json block should not affect the processing of the 2nd record
+   * 3. all lines in legal json block should be parsed successfully
+   *
+   */
+  @Test
+  public void testMultipleJsonStringInOneBlock() throws IOException
+  {
+    if (!ParserType.STR_JSON.equals(parserType) || !useInputFormatApi) {
+      return;
+    }
+
+    final TimestampSpec timestampSpec = new TimestampSpec("t", null, null);
+    final DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        ImmutableList.of(StringDimensionSchema.create("dim1PlusBar"))
+    );
+    final TransformSpec transformSpec = new TransformSpec(
+        null,
+        ImmutableList.of(new ExpressionTransform("dim1PlusBar", "concat(dim1 + 'bar')", TestExprMacroTable.INSTANCE))
+    );
+    final AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
+    final GranularitySpec granularitySpec = new UniformGranularitySpec(
+        Granularities.DAY,
+        Granularities.HOUR,
+        true,
+        null
+    );
+    final DataSchema dataSchema = createDataSchema(
+        timestampSpec,
+        dimensionsSpec,
+        aggregatorFactories,
+        granularitySpec,
+        transformSpec
+    );
+
+    List<String> jsonBlockList = ImmutableList.of(
+        // include the line which can't be parsed into JSON object to form a illegal json block
+        String.join("", STR_JSON_ROWS),
+
+        // exclude the last line to form a legal json block
+        String.join("", STR_JSON_ROWS.stream().limit(STR_JSON_ROWS.size() - 1).collect(Collectors.toList()))

Review comment:
       could you re-trigger the CI ? The failure does not related to changes in the PR.
   
   
   
   ```
   Failed with bad exit code during 'Extraction'
   
   [ERROR] Failed to execute goal on project druid-sql: Could not resolve dependencies for project org.apache.druid:druid-sql:jar:0.21.0-SNAPSHOT: Failed to collect dependencies at org.apache.calcite:calcite-core:jar:1.21.0 -> org.codehaus.janino:janino:jar:3.0.11: Failed to read artifact descriptor for org.codehaus.janino:janino:jar:3.0.11: Could not transfer artifact org.codehaus.janino:janino-parent:pom:3.0.11 from/to central (https://repo.maven.apache.org/maven2): Failed to transfer file https://repo.maven.apache.org/maven2/org/codehaus/janino/janino-parent/3.0.11/janino-parent-3.0.11.pom with status code 503 -> [Help 1]
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on pull request #10383: [WIP]Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-693185689


   Marked as WIP temporarily.
   
   `JsonReader.hasNext`, which calls the jackson's `MappingIterator.hasNext`, throws exception when a json text is ill-formed, this causes some old test cases on `sample` to fail while test cases on `read` are successfully passed. This is because stream ingestion handle each record one by one, and if one fails to parse the json next, the exception is caught and handled gracefully, while `sample` treats all records as a whole if one record fails, the sample process aborts.
   
   It would take some time to handle this problem.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r499784043



##########
File path: core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -123,7 +138,7 @@ public void close() throws IOException
 
   /**
    * Parses the given intermediate row into a list of {@link InputRow}s.
-   * This should return a non-empty list.
+   * This should return a non-empty list

Review comment:
       Accidental change?

##########
File path: core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -96,23 +99,35 @@ public void close() throws IOException
   public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
   {
     return intermediateRowIterator().map(row -> {
-      final Map<String, Object> rawColumns;
+
+      final List<Map<String, Object>> rawColumnsList;
       try {
-        rawColumns = toMap(row);
+        rawColumnsList = toMap(row);
       }
       catch (Exception e) {
-        return InputRowListPlusRawValues.of(null, new ParseException(e, "Unable to parse row [%s] into JSON", row));
+        return Collections.singletonList(InputRowListPlusRawValues.of(null,
+                                                                      new ParseException(e, "Unable to parse row [%s] into JSON", row)));
       }
+
+      List<InputRow> rows;
       try {
-        return InputRowListPlusRawValues.of(parseInputRows(row), rawColumns);
+        rows = parseInputRows(row);
       }
       catch (ParseException e) {
-        return InputRowListPlusRawValues.of(rawColumns, e);
+        return Collections.singletonList(InputRowListPlusRawValues.of(rawColumnsList.isEmpty() ? null : rawColumnsList.get(0), e));
       }
       catch (IOException e) {
-        return InputRowListPlusRawValues.of(rawColumns, new ParseException(e, "Unable to parse row [%s] into inputRow", row));
+        return Collections.singletonList(InputRowListPlusRawValues.of(rawColumnsList.isEmpty() ? null : rawColumnsList.get(0),
+                                                                      new ParseException(e, "Unable to parse row [%s] into inputRow", row)));
+      }
+
+      List<InputRowListPlusRawValues> list = new ArrayList<InputRowListPlusRawValues>();
+      for (int i = 0; i < Math.min(rows.size(), rawColumnsList.size()); i++) {

Review comment:
       I think we should check if the sizes of `rawColumnsList` and `rows` are same (as in the Javadoc contract) instead of computing a min of them.

##########
File path: core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -96,23 +99,35 @@ public void close() throws IOException
   public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
   {
     return intermediateRowIterator().map(row -> {
-      final Map<String, Object> rawColumns;
+
+      final List<Map<String, Object>> rawColumnsList;
       try {
-        rawColumns = toMap(row);
+        rawColumnsList = toMap(row);
       }
       catch (Exception e) {
-        return InputRowListPlusRawValues.of(null, new ParseException(e, "Unable to parse row [%s] into JSON", row));
+        return Collections.singletonList(InputRowListPlusRawValues.of(null,
+                                                                      new ParseException(e, "Unable to parse row [%s] into JSON", row)));
       }
+
+      List<InputRow> rows;
       try {
-        return InputRowListPlusRawValues.of(parseInputRows(row), rawColumns);
+        rows = parseInputRows(row);
       }
       catch (ParseException e) {
-        return InputRowListPlusRawValues.of(rawColumns, e);
+        return Collections.singletonList(InputRowListPlusRawValues.of(rawColumnsList.isEmpty() ? null : rawColumnsList.get(0), e));

Review comment:
       Should we return the entire `rawColumnsList` instead of the first element? I thought `InputRowListPlusRawValues` will be able to have a list of `rawColumns`.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +73,57 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
   @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
+  protected CloseableIterator<String> intermediateRowIterator() throws IOException
   {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
+    return CloseableIterators.withEmptyBaggage(
+        Iterators.singletonIterator(IOUtils.toString(source.open(), StringUtils.UTF8_STRING))
+    );
   }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  protected List<InputRow> parseInputRows(String intermediateRow) throws IOException, ParseException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
-  }
+    try (JsonParser parser = new JsonFactory().createParser(intermediateRow)) {
+      final Iterator<JsonNode> delegate = mapper.readValues(parser, JsonNode.class);
+      return FluentIterable.from(() -> delegate)
+                           .transform(jsonNode -> MapInputRowParser.parse(inputRowSchema, flattener.flatten(jsonNode)))
+                           .toList();
+    }
+    catch (RuntimeException e) {
+      //convert Jackson's JsonParseException into druid's exception for further processing
+      if (e.getCause() instanceof JsonParseException) {

Review comment:
       Where is `JsonParseException` thrown wrapped in `RuntimeException`? Can you add a comment about it?

##########
File path: core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -96,23 +99,35 @@ public void close() throws IOException
   public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
   {
     return intermediateRowIterator().map(row -> {
-      final Map<String, Object> rawColumns;
+
+      final List<Map<String, Object>> rawColumnsList;
       try {
-        rawColumns = toMap(row);
+        rawColumnsList = toMap(row);
       }
       catch (Exception e) {
-        return InputRowListPlusRawValues.of(null, new ParseException(e, "Unable to parse row [%s] into JSON", row));
+        return Collections.singletonList(InputRowListPlusRawValues.of(null,
+                                                                      new ParseException(e, "Unable to parse row [%s] into JSON", row)));
       }
+
+      List<InputRow> rows;
       try {
-        return InputRowListPlusRawValues.of(parseInputRows(row), rawColumns);
+        rows = parseInputRows(row);
       }
       catch (ParseException e) {
-        return InputRowListPlusRawValues.of(rawColumns, e);
+        return Collections.singletonList(InputRowListPlusRawValues.of(rawColumnsList.isEmpty() ? null : rawColumnsList.get(0), e));

Review comment:
       Also, should we fail if `rawColumnsList.isEmpty()`?

##########
File path: core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java
##########
@@ -69,7 +69,7 @@ public void testEquals()
               new ObjectMapper(),
               new ObjectMapper()
               )
-              .withIgnoredFields("objectMapper")
+              .withIgnoredFields("objectMapper", "lineSplittable")

Review comment:
       `lineSplittable` shouldn't be ignored, but included in `equals()` and `hashCode()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 edited a comment on pull request #10383: [WIP]Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 edited a comment on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-693185689


   Marked as WIP temporarily.
   
   `JsonReader.hasNext`, which calls the jackson's `MappingIterator.hasNext`, throws exception when a json text is ill-formed, this causes some old test cases on `sample` to fail while test cases on `read` are successfully passed. This is because stream ingestion handle each record one by one, and if one fails to parse json text, exception is caught and handled gracefully, while `sample` treats all records as a whole if one record fails, the sample process aborts.
   
   It would take some time to handle this problem.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r502078014



##########
File path: core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -96,23 +99,35 @@ public void close() throws IOException
   public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
   {
     return intermediateRowIterator().map(row -> {
-      final Map<String, Object> rawColumns;
+
+      final List<Map<String, Object>> rawColumnsList;
       try {
-        rawColumns = toMap(row);
+        rawColumnsList = toMap(row);
       }
       catch (Exception e) {
-        return InputRowListPlusRawValues.of(null, new ParseException(e, "Unable to parse row [%s] into JSON", row));
+        return Collections.singletonList(InputRowListPlusRawValues.of(null,
+                                                                      new ParseException(e, "Unable to parse row [%s] into JSON", row)));
       }
+
+      List<InputRow> rows;
       try {
-        return InputRowListPlusRawValues.of(parseInputRows(row), rawColumns);
+        rows = parseInputRows(row);
       }
       catch (ParseException e) {
-        return InputRowListPlusRawValues.of(rawColumns, e);
+        return Collections.singletonList(InputRowListPlusRawValues.of(rawColumnsList.isEmpty() ? null : rawColumnsList.get(0), e));

Review comment:
       > I checked the code again and find that there's no need to return raw column object when exception occurs. Because `InputSourceSample` re-throws exception once `InputRowListPlusRawValues.getParseException` returns exception.
   
   [Actually, there is a clause catching `ParseException`](https://github.com/apache/druid/blob/master/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java#L142-L146) in the sampler (this is a bad pattern by the way. We should refactor it at some point). In this case, the `rawColumns` is returned to show the raw data in the UI. I think the entire `rawColumnsList` should be returned to keep the current behaviour.

##########
File path: core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java
##########
@@ -69,7 +69,7 @@ public void testEquals()
               new ObjectMapper(),
               new ObjectMapper()
               )
-              .withIgnoredFields("objectMapper")
+              .withIgnoredFields("objectMapper", "lineSplittable")

Review comment:
       Hmm, good point. Yeah, it's not a good convention to include a mutable field in `equals()` or `hashCode()` because they can be used as a key of a `HashMap` or `HashSet`. I would like to suggest to make `lineSplittable` immutable, but add a new method `withLineSplittable(boolean lineSplittable)` instead of `setLineSplittable()` which creates a new `JsonInputFormat` instance with the given value.

##########
File path: core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -96,23 +99,35 @@ public void close() throws IOException
   public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
   {
     return intermediateRowIterator().map(row -> {
-      final Map<String, Object> rawColumns;
+
+      final List<Map<String, Object>> rawColumnsList;
       try {
-        rawColumns = toMap(row);
+        rawColumnsList = toMap(row);
       }
       catch (Exception e) {
-        return InputRowListPlusRawValues.of(null, new ParseException(e, "Unable to parse row [%s] into JSON", row));
+        return Collections.singletonList(InputRowListPlusRawValues.of(null,
+                                                                      new ParseException(e, "Unable to parse row [%s] into JSON", row)));
       }
+
+      List<InputRow> rows;
       try {
-        return InputRowListPlusRawValues.of(parseInputRows(row), rawColumns);
+        rows = parseInputRows(row);
       }
       catch (ParseException e) {
-        return InputRowListPlusRawValues.of(rawColumns, e);
+        return Collections.singletonList(InputRowListPlusRawValues.of(rawColumnsList.isEmpty() ? null : rawColumnsList.get(0), e));

Review comment:
       There are some test failing in `InputSourceSamplerTest`. It seems like because of this change.

##########
File path: core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -96,23 +99,35 @@ public void close() throws IOException
   public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
   {
     return intermediateRowIterator().map(row -> {
-      final Map<String, Object> rawColumns;
+
+      final List<Map<String, Object>> rawColumnsList;
       try {
-        rawColumns = toMap(row);
+        rawColumnsList = toMap(row);
       }
       catch (Exception e) {
-        return InputRowListPlusRawValues.of(null, new ParseException(e, "Unable to parse row [%s] into JSON", row));
+        return Collections.singletonList(InputRowListPlusRawValues.of(null,
+                                                                      new ParseException(e, "Unable to parse row [%s] into JSON", row)));
       }
+
+      List<InputRow> rows;
       try {
-        return InputRowListPlusRawValues.of(parseInputRows(row), rawColumns);
+        rows = parseInputRows(row);
       }
       catch (ParseException e) {
-        return InputRowListPlusRawValues.of(rawColumns, e);
+        return Collections.singletonList(InputRowListPlusRawValues.of(rawColumnsList.isEmpty() ? null : rawColumnsList.get(0), e));

Review comment:
       There are some tests failing in `InputSourceSamplerTest`. It seems like because of this change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r491620988



##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
##########
@@ -41,6 +42,20 @@
   private final ObjectMapper objectMapper;
   private final boolean keepNullColumns;
 
+  /**
+   * <pre>
+   * This parameter is introduced to support json string of an object in multiple lines in streaming ingestion records
+   *
+   * It indicates whether the input text can be splitted into lines in first, which will then be parsed into JSON objects one by one independently.

Review comment:
       typo: splitted -> split.

##########
File path: core/src/main/java/org/apache/druid/data/input/ExceptionThrowingIterator.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+public class ExceptionThrowingIterator<T> implements CloseableIterator<T>

Review comment:
       As this class is extracted now, please add some Javaodc describing what this class does and where it is used.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -33,13 +40,98 @@
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
-public class JsonReader extends TextReader
+/**
+ * <pre>
+ * In constract to {@link JsonLineReader} which processes input text line by line independently,
+ * this class tries to parse the input text as a whole to an array of objects.
+ *
+ * The input text can be:
+ * 1. a JSON string of an object in a line or multiple lines(such as pretty-printed JSON text)
+ * 2. multiple JSON object strings concated by white space character(s)
+ *
+ * For case 2, what should be noticed is that if an exception is thrown when parsing one JSON string,
+ * the rest JSON text will all be ignored
+ *
+ * For more information, see: https://github.com/apache/druid/pull/10383
+ * </pre>
+ */
+public class JsonReader implements InputEntityReader

Review comment:
       Thinking about this new reader, I think the new requirement for parsing multiple JSON objects to multiple rows doesn't fit in the current interface of the sampler. The sampler currently assumes that _there is only one JSON object in an input chunk_ which could have either an array or a nested object. That means, the current interface allows multiple out rows as `inputRows` is a list of `InputRow`s in `InputRowListPlusRawValues`, but doesn't allow multiple values in an input chunk as `rawValues` is a map in `InputRowListPlusRawValues`. To support the requirement, I think `rawValues` should be a list of map, so that all raw values in an input chunk can be returned to the sampler.
   
   If we do this, `JsonReader` can simply extend `IntermediateRowParsingReader`, where the intermediate row will be the [`inputText` created from `source.open()`](https://github.com/apache/druid/pull/10383/files#diff-cc591396000acab135e9bd4aa1f31e2aR72). Here is an example.
   
   ```java
   public class JsonReader2 extends IntermediateRowParsingReader<String>
   {
     private final ObjectFlattener<JsonNode> flattener;
     private final ObjectMapper mapper;
     private final InputEntity source;
     private final InputRowSchema inputRowSchema;
   
     JsonReader2(
         InputRowSchema inputRowSchema,
         InputEntity source,
         JSONPathSpec flattenSpec,
         ObjectMapper mapper,
         boolean keepNullColumns
     )
     {
       this.inputRowSchema = inputRowSchema;
       this.source = source;
       this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
       this.mapper = mapper;
     }
   
     @Override
     protected CloseableIterator<String> intermediateRowIterator() throws IOException
     {
       return CloseableIterators.withEmptyBaggage(
           Iterators.singletonIterator(IOUtils.toString(source.open(), StringUtils.UTF8_STRING))
       );
     }
   
     @Override
     protected List<InputRow> parseInputRows(String intermediateRow) throws IOException, ParseException
     {
       try (JsonParser parser = new JsonFactory().createParser(intermediateRow)) {
         final Iterator<JsonNode> delegate = mapper.readValues(parser, JsonNode.class);
         return FluentIterable.from(() -> delegate)
                              .transform(jsonNode -> MapInputRowParser.parse(inputRowSchema, flattener.flatten(jsonNode)))
                              .toList();
       }
     }
   
     @Override
     protected List<Map<String, Object>> toMap(String intermediateRow) throws IOException
     {
       try (JsonParser parser = new JsonFactory().createParser(intermediateRow)) {
         final Iterator<Map> delegate = mapper.readValues(parser, Map.class);
         return FluentIterable.from(() -> delegate).transform(map -> (Map<String, Object>) map).toList();
       }
     }
   }
   ```
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   

##########
File path: core/src/main/java/org/apache/druid/data/input/ExceptionThrowingIterator.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+public class ExceptionThrowingIterator<T> implements CloseableIterator<T>

Review comment:
       If `JsonReader` extends `IntermediateRowParsingReader`, this class will not have to be extracted.

##########
File path: core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java
##########
@@ -126,138 +149,158 @@ public void testParseRowWithConditional() throws IOException
     try (CloseableIterator<InputRow> iterator = reader.read()) {
       int numActualIterations = 0;
       while (iterator.hasNext()) {
+
         final InputRow row = iterator.next();
-        Assert.assertEquals("test", Iterables.getOnlyElement(row.getDimension("bar")));
-        Assert.assertEquals(Collections.emptyList(), row.getDimension("foo"));
-        Assert.assertTrue(row.getDimension("baz").isEmpty());
-        numActualIterations++;
-      }
-      Assert.assertEquals(numExpectedIterations, numActualIterations);
-    }
-  }
 
-  @Test
-  public void testParseRowKeepNullColumns() throws IOException
-  {
-    final JsonInputFormat format = new JsonInputFormat(
-        new JSONPathSpec(
-            true,
-            ImmutableList.of(
-                new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg")
-            )
-        ),
-        null,
-        true
-    );
+        Assert.assertEquals(DateTimes.of("2019-01-01"), row.getTimestamp());
+        Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
+        Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
+        Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
+        Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
+        Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
 
-    final ByteEntity source = new ByteEntity(
-        StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"o\":{\"mg\":null}}")
-    );
+        Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
+        Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
+        Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
 
-    final InputEntityReader reader = format.createReader(
-        new InputRowSchema(
-            new TimestampSpec("timestamp", "iso", null),
-            new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Collections.emptyList())),
-            Collections.emptyList()
-        ),
-        source,
-        null
-    );
-    final int numExpectedIterations = 1;
-    try (CloseableIterator<InputRow> iterator = reader.read()) {
-      int numActualIterations = 0;
-      while (iterator.hasNext()) {
-        final InputRow row = iterator.next();
-        Assert.assertEquals(Arrays.asList("path_omg", "timestamp", "bar", "foo"), row.getDimensions());
-        Assert.assertTrue(row.getDimension("bar").isEmpty());
-        Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
-        Assert.assertTrue(row.getDimension("path_omg").isEmpty());
         numActualIterations++;
       }
+
       Assert.assertEquals(numExpectedIterations, numActualIterations);
     }
   }
 
   @Test
-  public void testKeepNullColumnsWithNoNullValues() throws IOException
+  public void testInvalidJSONText() throws IOException
   {
     final JsonInputFormat format = new JsonInputFormat(
         new JSONPathSpec(
             true,
             ImmutableList.of(
-                new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg")
+                new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"),
+                new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"),
+                new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"),
+                new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"),
+                new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
+                new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
             )
         ),
         null,
-        true
+        null
     );
 
+    //make sure JsonReader is used
+    format.setLineSplittable(false);
+
     final ByteEntity source = new ByteEntity(
-        StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":1,\"foo\":\"x\",\"o\":{\"mg\":\"a\"}}")
+        StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}"
+                           + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}" //baz property is illegal
+                           + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}")
     );
 
     final InputEntityReader reader = format.createReader(
         new InputRowSchema(
             new TimestampSpec("timestamp", "iso", null),
-            new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Collections.emptyList())),
+            new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
             Collections.emptyList()
         ),
         source,
         null
     );
+
+    // the 2nd line is ill-formed, it stops to iterate to the 3rd line.
+    // So in total, only 1 lines has been parsed
     final int numExpectedIterations = 1;
+
     try (CloseableIterator<InputRow> iterator = reader.read()) {
       int numActualIterations = 0;
       while (iterator.hasNext()) {
-        final InputRow row = iterator.next();
-        Assert.assertEquals(Arrays.asList("path_omg", "timestamp", "bar", "foo"), row.getDimensions());
-        Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("bar")));
-        Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
-        Assert.assertEquals("a", Iterables.getOnlyElement(row.getDimension("path_omg")));
-        numActualIterations++;
+
+        try {
+          final InputRow row = iterator.next();
+
+          final String msgId = String.valueOf(++numActualIterations);
+          Assert.assertEquals(DateTimes.of("2019-01-01"), row.getTimestamp());
+          Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
+          Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
+          Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
+          Assert.assertEquals(msgId, Iterables.getOnlyElement(row.getDimension("path_omg")));
+          Assert.assertEquals(msgId, Iterables.getOnlyElement(row.getDimension("jq_omg")));
+
+          Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
+          Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
+          Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
+        }
+        catch (Exception e) {
+          //ignore the exception when parsing the 2nd

Review comment:
       Please verify that `e` is the exception what we expect.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
##########
@@ -41,6 +42,20 @@
   private final ObjectMapper objectMapper;
   private final boolean keepNullColumns;
 
+  /**
+   * <pre>

Review comment:
       The `pre` tag is a tag for pre-formatted text such as source codes. I think you don't have to use it in this case.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -33,13 +40,98 @@
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
-public class JsonReader extends TextReader
+/**
+ * <pre>

Review comment:
       Same comment here. You don't need a `pre` tag.

##########
File path: core/src/main/java/org/apache/druid/data/input/ExceptionThrowingIterator.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+public class ExceptionThrowingIterator<T> implements CloseableIterator<T>
+{
+  private final RuntimeException exception;
+
+  private boolean thrown = false;
+
+  public ExceptionThrowingIterator(Throwable exception)
+  {
+    this.exception = exception instanceof RuntimeException
+                     ? (RuntimeException) exception
+                     : new RuntimeException(exception);
+  }
+
+  @Override
+  public boolean hasNext()
+  {
+    return !thrown;
+  }
+
+  @Override
+  public T next()
+  {
+    thrown = true;

Review comment:
       Hmm, SpotBugs thinks this is not good since it cannot throw `NoSuchElementException` which is wrong since `next()` never can be called more than once. But, let's make it happy by simply adding `if (!haxNext()) throws `NoSuchElementException` here.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
##########
@@ -73,7 +88,14 @@ public boolean isSplittable()
   @Override
   public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
   {
-    return new JsonReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns);
+    return this.lineSplittable ?
+           new JsonLineReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns) :
+           new JsonReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns);
+  }
+
+  public void setLineSplittable(boolean lineSplittable)

Review comment:
       This doesn't look pretty, but simple enough to handle this exceptional case. Maybe we can introduce another layer on top of inputFormat to make it more pretty, but I don't think it will be worth at least at this point.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
##########
@@ -41,6 +42,20 @@
   private final ObjectMapper objectMapper;
   private final boolean keepNullColumns;
 
+  /**
+   * <pre>

Review comment:
       Hmm, I think the Javadoc should be more clear on what this flag means. How about rephrase it as below?
   
   ```
   This parameter indicates whether or not the given InputEntity should be split by lines before parsing it. If it is set to true, the InputEntity must be split by lines first. If it is set to false, unlike what you could imagine, it means that the InputEntity doesn't have to be split by lines first, but it can still contain multiple lines. A created InputEntityReader from this format will determine by itself if line splitting is necessary.
   
   This parameter should always be true for batch ingestion and false for streaming ingestion. For more information, see: https://github.com/apache/druid/pull/10383.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691353141


   @FrankChen021 good catch. The option 2 among @gianm's suggestions sounds better to me. The option 1 sounds hard to actually implement since there will be lots of edge cases we should handle.
   
   Regarding integration tests, I think it would depend on what this PR is going to change. If it needs to modify some behavior in interaction between supervisor and tasks (for example, if the supervisor should set the `lineSplittable` flag properly), we need some. If the change can be contained only on the task side, I think adding some new unit tests in `KafkaIndexTask` would be enough. @suneet-s what do you think?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 edited a comment on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 edited a comment on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691185618


   Once a json text line is ill-formed, the `next` call on iterator returned by `ObjectMapper.readValues` throws exception, and it seems we have no opportunity to process the rest lines. This is why some old test cases fail. 
   
   I don't know whether jackson provides anyway to skip the ill-formed text, but I guess there's no such API for a parser.
   So what's your suggestion on this problem ? @gianm @jihoonson 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691353141






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10383: [WIP]Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-694394202


   @FrankChen021 hmm, do you think it would be better to change the behavior of the sampler? Maybe its behavior should match with the actual data reader.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691185618


   Once a json text line is ill-formed, the `next` call on iterator returned by `ObjectMapper.readValues` throws exception, and it seems we have no opportunity to process the rest lines. This is case why some old test cases fail. 
   
   I don't know whether jackson provides anyway to skip the ill-formed text, but I guess there's no such API for a parser.
   So what's your suggestion on this problem ? @gianm @jihoonson 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r510469802



##########
File path: core/src/main/java/org/apache/druid/data/input/InputRowListPlusRawValues.java
##########
@@ -82,8 +121,16 @@ private InputRowListPlusRawValues(
     return inputRows;
   }
 
+  /**
+   * This method is left here only for test cases
+   */
   @Nullable
   public Map<String, Object> getRawValues()
+  {
+    return CollectionUtils.isNullOrEmpty(rawValues) ? null : rawValues.get(0);

Review comment:
       nit: it would be better to use `Iterables.getOnlyElement(rawValues)` to make sure that you are getting the only element.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
##########
@@ -131,17 +130,27 @@ public SamplerResponse sample(
             continue;
           }
 
-          for (InputRow row : inputRowListPlusRawValues.getInputRows()) {
-            index.add(new SamplerInputRow(row, counter), true);
+          for (int i = 0; i < rawColumnsList.size(); i++) {
+            Map<String, Object> rawColumns = rawColumnsList.get(i);
+            InputRow row = inputRowListPlusRawValues.getInputRows().get(i);
+
+            //keep the index of the row to be added to responseRows for further use
+            final int rowIndex = responseRows.size();
+            index.add(new SamplerInputRow(row, rowIndex), true);
+
             // store the raw value; will be merged with the data from the IncrementalIndex later
-            responseRows[counter] = new SamplerResponseRow(rawColumns, null, null, null);
-            counter++;
+            responseRows.add(new SamplerResponseRow(rawColumns, null, null, null));
             numRowsIndexed++;
           }
         }
         catch (ParseException e) {
-          responseRows[counter] = new SamplerResponseRow(rawColumns, null, true, e.getMessage());
-          counter++;
+          if (rawColumnsList != null) {
+            responseRows.addAll(rawColumnsList.stream()
+                                              .map(rawColumns -> new SamplerResponseRow(rawColumns, null, true, e.getMessage()))
+                                              .collect(Collectors.toList()));

Review comment:
       Can this introduce duplicate rows in `responseRows`? Suppose you have 3 rawColumns in `rawColumnsList` including an unparseable row at the second position. The first row will be added to both `index` and thus a new `SamplerResponseRow` of the first `rawColumns` will be added to `responseRows`. But for the second row, `index.add()` will throw a `ParseException` which will execute these lines. In this case, 2 duplicate `rawColumns` of the first row will be added in `responseRows`.
   
   Looking at [what `JsonReader` does](https://github.com/apache/druid/pull/10383/files#diff-c745a28591c789098f0a9efd75769ad242b6c40379d155fc61143ed63dccc995R93-R108), it seems throwing away the whole `intermediateRow` when there is any unparseable row. The sampler behavior should match to the actual ingestion.

##########
File path: core/src/main/java/org/apache/druid/data/input/InputRowListPlusRawValues.java
##########
@@ -53,21 +54,59 @@ public static InputRowListPlusRawValues of(@Nullable InputRow inputRow, Map<Stri
 
   public static InputRowListPlusRawValues of(@Nullable List<InputRow> inputRows, Map<String, Object> rawColumns)
   {
-    return new InputRowListPlusRawValues(inputRows, Preconditions.checkNotNull(rawColumns, "rawColumns"), null);
+    return new InputRowListPlusRawValues(inputRows,
+                                         Collections.singletonList(Preconditions.checkNotNull(rawColumns, "rawColumns")),
+                                         null);
   }
 
   public static InputRowListPlusRawValues of(@Nullable Map<String, Object> rawColumns, ParseException parseException)
   {
     return new InputRowListPlusRawValues(
         null,
-        rawColumns,
+        rawColumns == null ? null : Collections.singletonList(rawColumns),
         Preconditions.checkNotNull(parseException, "parseException")
     );
   }
 
+  public static InputRowListPlusRawValues ofList(@Nullable List<Map<String, Object>> rawColumnsList, ParseException parseException)
+  {
+    return ofList(rawColumnsList, null, parseException);
+  }
+
+  /**
+   * Create an instance of {@link InputRowListPlusRawValues}
+   *
+   * Make sure the size of given rawColumnsList and inputRows are the same if both of them are not null
+   */
+  public static InputRowListPlusRawValues ofList(@Nullable List<Map<String, Object>> rawColumnsList,
+                                                 @Nullable List<InputRow> inputRows)
+  {
+    return ofList(rawColumnsList, inputRows, null);
+  }
+
+  /**
+   * Create an instance of {@link InputRowListPlusRawValues}
+   *
+   * Make sure the size of given rawColumnsList and inputRows are the same if both of them are not null
+   */
+  public static InputRowListPlusRawValues ofList(@Nullable List<Map<String, Object>> rawColumnsList,
+                                                 @Nullable List<InputRow> inputRows,
+                                                 ParseException parseException)

Review comment:
       `ParseException` is Nullable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r502079368



##########
File path: core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java
##########
@@ -69,7 +69,7 @@ public void testEquals()
               new ObjectMapper(),
               new ObjectMapper()
               )
-              .withIgnoredFields("objectMapper")
+              .withIgnoredFields("objectMapper", "lineSplittable")

Review comment:
       Hmm, good point. Yeah, it's not a good convention to include a mutable field in `equals()` or `hashCode()` because they can be used as a key of a `HashMap` or `HashSet`. I would like to suggest to make `lineSplittable` immutable, but add a new method `withLineSplittable(boolean lineSplittable)` instead of `setLineSplittable()` which creates a new `JsonInputFormat` instance with the given value.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r502078014



##########
File path: core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -96,23 +99,35 @@ public void close() throws IOException
   public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
   {
     return intermediateRowIterator().map(row -> {
-      final Map<String, Object> rawColumns;
+
+      final List<Map<String, Object>> rawColumnsList;
       try {
-        rawColumns = toMap(row);
+        rawColumnsList = toMap(row);
       }
       catch (Exception e) {
-        return InputRowListPlusRawValues.of(null, new ParseException(e, "Unable to parse row [%s] into JSON", row));
+        return Collections.singletonList(InputRowListPlusRawValues.of(null,
+                                                                      new ParseException(e, "Unable to parse row [%s] into JSON", row)));
       }
+
+      List<InputRow> rows;
       try {
-        return InputRowListPlusRawValues.of(parseInputRows(row), rawColumns);
+        rows = parseInputRows(row);
       }
       catch (ParseException e) {
-        return InputRowListPlusRawValues.of(rawColumns, e);
+        return Collections.singletonList(InputRowListPlusRawValues.of(rawColumnsList.isEmpty() ? null : rawColumnsList.get(0), e));

Review comment:
       > I checked the code again and find that there's no need to return raw column object when exception occurs. Because `InputSourceSample` re-throws exception once `InputRowListPlusRawValues.getParseException` returns exception.
   
   [Actually, there is a clause catching `ParseException`](https://github.com/apache/druid/blob/master/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java#L142-L146) in the sampler (this is a bad pattern by the way. We should refactor it at some point). In this case, the `rawColumns` is returned to show the raw data in the UI. I think the entire `rawColumnsList` should be returned to keep the current behaviour.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r520275503



##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
##########
@@ -1060,6 +1062,107 @@ public void testWithFilter() throws IOException
     );
   }
 
+  @Test
+  public void testIndexParseException() throws IOException

Review comment:
       A block of multiple json strings is only supported in stream ingestion as we discussed above, that's also why the test cases are added in kafka-indexing-service module.[It's implemented by setting the new property on `JsonInputFormat` in `RecordSupplierInputSource`](https://github.com/apache/druid/pull/10383/files#diff-72b4a411d4ce193935a708df516b00f94827e9563ba7adf00c9e0be479435b5bR106).
   
    For test cases for `InputSourceSampler` here, the input source is `InlineInputSource` which always uses `InputEntityIteratingReader` even when input format is `JsonInputFormat`, there's no chance to test that. So I think we don't need add test cases here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r503577153



##########
File path: core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -96,23 +101,45 @@ public void close() throws IOException
   public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
   {
     return intermediateRowIterator().map(row -> {
-      final Map<String, Object> rawColumns;
+
+      final List<Map<String, Object>> rawColumnsList;
       try {
-        rawColumns = toMap(row);
+        rawColumnsList = toMap(row);
       }
       catch (Exception e) {
-        return InputRowListPlusRawValues.of(null, new ParseException(e, "Unable to parse row [%s] into JSON", row));
+        return Collections.singletonList(InputRowListPlusRawValues.of(null,
+                                                                      new ParseException(e, "Unable to parse row [%s] into JSON", row)));
+      }
+
+      if (CollectionUtils.isNullOrEmpty(rawColumnsList)) {
+        return Collections.singletonList(InputRowListPlusRawValues.of(null,
+                                                                      new ParseException("No map object parsed for row [%s]", row)));
       }
+
+      List<InputRow> rows;
       try {
-        return InputRowListPlusRawValues.of(parseInputRows(row), rawColumns);
+        rows = parseInputRows(row);
       }
       catch (ParseException e) {
-        return InputRowListPlusRawValues.of(rawColumns, e);
+        return rawColumnsList.stream().map(rawColumn -> InputRowListPlusRawValues.of(rawColumn, e)).collect(Collectors.toList());
       }
       catch (IOException e) {
-        return InputRowListPlusRawValues.of(rawColumns, new ParseException(e, "Unable to parse row [%s] into inputRow", row));
+        ParseException exception = new ParseException(e, "Unable to parse row [%s] into inputRow", row);
+        return rawColumnsList.stream().map(rawColumn -> InputRowListPlusRawValues.of(rawColumn, exception)).collect(Collectors.toList());
+      }
+
+      if (rows.size() != rawColumnsList.size()) {
+        return Collections.singletonList(InputRowListPlusRawValues.of(null,

Review comment:
       I think we should set both `rows` and `rawColumnsList` in `InputRowListPlusRawValues` in this case, so that users will learn how they are different.

##########
File path: core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -96,23 +101,45 @@ public void close() throws IOException
   public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
   {
     return intermediateRowIterator().map(row -> {
-      final Map<String, Object> rawColumns;
+
+      final List<Map<String, Object>> rawColumnsList;
       try {
-        rawColumns = toMap(row);
+        rawColumnsList = toMap(row);
       }
       catch (Exception e) {
-        return InputRowListPlusRawValues.of(null, new ParseException(e, "Unable to parse row [%s] into JSON", row));
+        return Collections.singletonList(InputRowListPlusRawValues.of(null,
+                                                                      new ParseException(e, "Unable to parse row [%s] into JSON", row)));
+      }
+
+      if (CollectionUtils.isNullOrEmpty(rawColumnsList)) {
+        return Collections.singletonList(InputRowListPlusRawValues.of(null,
+                                                                      new ParseException("No map object parsed for row [%s]", row)));
       }
+
+      List<InputRow> rows;
       try {
-        return InputRowListPlusRawValues.of(parseInputRows(row), rawColumns);
+        rows = parseInputRows(row);
       }
       catch (ParseException e) {
-        return InputRowListPlusRawValues.of(rawColumns, e);
+        return rawColumnsList.stream().map(rawColumn -> InputRowListPlusRawValues.of(rawColumn, e)).collect(Collectors.toList());
       }
       catch (IOException e) {
-        return InputRowListPlusRawValues.of(rawColumns, new ParseException(e, "Unable to parse row [%s] into inputRow", row));
+        ParseException exception = new ParseException(e, "Unable to parse row [%s] into inputRow", row);
+        return rawColumnsList.stream().map(rawColumn -> InputRowListPlusRawValues.of(rawColumn, exception)).collect(Collectors.toList());
+      }

Review comment:
       This doesn't still seem correct to me.. The purpose of returning `rawColumn` and `exception` in `InputRowListPlusRawValues` is showing them in the sampler altogether, so that users will debug their data and input format easily. To do that, we should show exactly what the data was when a `ParseException` is thrown. Now, thinking about the new behavior parsing a list of JSONs into `InputRow`s, `row` will contain the list of JSONs here. If a `ParseException` was thrown while parsing one of them, we should return one `InputRowListPlusRawValues` which contains `row` (the whole list of JSONs) and the exception because we don't know exactly in which JSON in the list the `ParseException` was thrown from. For this, I still think we should change `rawValues` in `InputRowListPlusRawValues` to be `List<Map<String, Object>>`. I understand that you don't want to touch it as it is widely used in unit tests, but I'm not sure if we can fix this without touching it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r494728667



##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -33,13 +40,98 @@
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
-public class JsonReader extends TextReader
+/**
+ * <pre>
+ * In constract to {@link JsonLineReader} which processes input text line by line independently,
+ * this class tries to parse the input text as a whole to an array of objects.
+ *
+ * The input text can be:
+ * 1. a JSON string of an object in a line or multiple lines(such as pretty-printed JSON text)
+ * 2. multiple JSON object strings concated by white space character(s)
+ *
+ * For case 2, what should be noticed is that if an exception is thrown when parsing one JSON string,
+ * the rest JSON text will all be ignored
+ *
+ * For more information, see: https://github.com/apache/druid/pull/10383
+ * </pre>
+ */
+public class JsonReader implements InputEntityReader

Review comment:
       > The sampler currently assumes that there is only one JSON object in an input chunk which could have either an array or a nested object. 
   
   That's the root cause why `ExceptionThrowingIterator` is extracted and `JsonReader` inherits from `InputEntityReader` directly.
   
   Your suggestion provides a new and simple way to deal with it. I'll test the code later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r520275818



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
##########
@@ -111,38 +113,49 @@ public SamplerResponse sample(
     try (final CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample();
          final IncrementalIndex<Aggregator> index = buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema);
          final Closer closer1 = closer) {
-      SamplerResponseRow[] responseRows = new SamplerResponseRow[nonNullSamplerConfig.getNumRows()];
-      int counter = 0, numRowsIndexed = 0;
-
-      while (counter < responseRows.length && iterator.hasNext()) {
-        Map<String, Object> rawColumns = null;
-        try {
-          final InputRowListPlusRawValues inputRowListPlusRawValues = iterator.next();
-
-          if (inputRowListPlusRawValues.getRawValues() != null) {
-            rawColumns = inputRowListPlusRawValues.getRawValues();
-          }
-
-          if (inputRowListPlusRawValues.getParseException() != null) {
-            throw inputRowListPlusRawValues.getParseException();
+      List<SamplerResponseRow> responseRows = new ArrayList<>(nonNullSamplerConfig.getNumRows());
+      int numRowsIndexed = 0;
+
+      while (responseRows.size() < nonNullSamplerConfig.getNumRows() && iterator.hasNext()) {
+        final InputRowListPlusRawValues inputRowListPlusRawValues = iterator.next();
+
+        final List<Map<String, Object>> rawColumnsList = inputRowListPlusRawValues.getRawValuesList();
+
+        final ParseException parseException = inputRowListPlusRawValues.getParseException();
+        if (parseException != null) {
+          if (rawColumnsList != null) {
+            // add all rows to response
+            responseRows.addAll(rawColumnsList.stream()
+                                              .map(rawColumns -> new SamplerResponseRow(rawColumns, null, true, parseException.getMessage()))
+                                              .collect(Collectors.toList()));

Review comment:
       Yes, returning the whole `rawColumnsList` involves some changes at the web-console side which is currently out of my ability. Thanks for understanding.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r487371583



##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +55,99 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
-  @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
-  {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
-  }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  public CloseableIterator<InputRow> read() throws IOException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
+    final MappingIterator<JsonNode> delegate = mapper.readValues(
+        new JsonFactory().createParser(this.source.open()),

Review comment:
       The `parser` created from `JsonFactory` is `Closeable` which should be closed when the below `CloseableIterator` is closed. 

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +55,99 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
-  @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
-  {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
-  }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  public CloseableIterator<InputRow> read() throws IOException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
+    final MappingIterator<JsonNode> delegate = mapper.readValues(
+        new JsonFactory().createParser(this.source.open()),

Review comment:
       I haven't looked through the PR yet, but found that the `parser` created from `JsonFactory` is `Closeable` which should be closed when the below `CloseableIterator` is closed. I will review again when you update the PR.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +55,99 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
-  @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
-  {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
-  }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  public CloseableIterator<InputRow> read() throws IOException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
+    final MappingIterator<JsonNode> delegate = mapper.readValues(
+        new JsonFactory().createParser(this.source.open()),

Review comment:
       The `parser` created from `JsonFactory` is `Closeable` which should be closed when the below `CloseableIterator` is closed. 

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +55,99 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
-  @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
-  {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
-  }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  public CloseableIterator<InputRow> read() throws IOException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
+    final MappingIterator<JsonNode> delegate = mapper.readValues(
+        new JsonFactory().createParser(this.source.open()),

Review comment:
       I haven't looked through the PR yet, but found that the `parser` created from `JsonFactory` is `Closeable` which should be closed when the below `CloseableIterator` is closed. I will review again when you update the PR.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +55,99 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
-  @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
-  {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
-  }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  public CloseableIterator<InputRow> read() throws IOException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
+    final MappingIterator<JsonNode> delegate = mapper.readValues(
+        new JsonFactory().createParser(this.source.open()),

Review comment:
       The `parser` created from `JsonFactory` is `Closeable` which should be closed when the below `CloseableIterator` is closed. 

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +55,99 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
-  @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
-  {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
-  }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  public CloseableIterator<InputRow> read() throws IOException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
+    final MappingIterator<JsonNode> delegate = mapper.readValues(
+        new JsonFactory().createParser(this.source.open()),

Review comment:
       I haven't looked through the PR yet, but found that the `parser` created from `JsonFactory` is `Closeable` which should be closed when the below `CloseableIterator` is closed. I will review again when you update the PR.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +55,99 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
-  @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
-  {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
-  }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  public CloseableIterator<InputRow> read() throws IOException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
+    final MappingIterator<JsonNode> delegate = mapper.readValues(
+        new JsonFactory().createParser(this.source.open()),

Review comment:
       The `parser` created from `JsonFactory` is `Closeable` which should be closed when the below `CloseableIterator` is closed. 

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +55,99 @@
       boolean keepNullColumns
   )
   {
-    super(inputRowSchema, source);
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
     this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
     this.mapper = mapper;
   }
 
-  @Override
-  public List<InputRow> parseInputRows(String line) throws IOException, ParseException
-  {
-    final JsonNode document = mapper.readValue(line, JsonNode.class);
-    final Map<String, Object> flattened = flattener.flatten(document);
-    return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), flattened));
-  }
 
   @Override
-  public Map<String, Object> toMap(String intermediateRow) throws IOException
+  public CloseableIterator<InputRow> read() throws IOException
   {
-    //noinspection unchecked
-    return mapper.readValue(intermediateRow, Map.class);
+    final MappingIterator<JsonNode> delegate = mapper.readValues(
+        new JsonFactory().createParser(this.source.open()),

Review comment:
       I haven't looked through the PR yet, but found that the `parser` created from `JsonFactory` is `Closeable` which should be closed when the below `CloseableIterator` is closed. I will review again when you update the PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r513028037



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
##########
@@ -131,17 +130,27 @@ public SamplerResponse sample(
             continue;
           }
 
-          for (InputRow row : inputRowListPlusRawValues.getInputRows()) {
-            index.add(new SamplerInputRow(row, counter), true);
+          for (int i = 0; i < rawColumnsList.size(); i++) {
+            Map<String, Object> rawColumns = rawColumnsList.get(i);
+            InputRow row = inputRowListPlusRawValues.getInputRows().get(i);
+
+            //keep the index of the row to be added to responseRows for further use
+            final int rowIndex = responseRows.size();
+            index.add(new SamplerInputRow(row, rowIndex), true);
+
             // store the raw value; will be merged with the data from the IncrementalIndex later
-            responseRows[counter] = new SamplerResponseRow(rawColumns, null, null, null);
-            counter++;
+            responseRows.add(new SamplerResponseRow(rawColumns, null, null, null));
             numRowsIndexed++;
           }
         }
         catch (ParseException e) {
-          responseRows[counter] = new SamplerResponseRow(rawColumns, null, true, e.getMessage());
-          counter++;
+          if (rawColumnsList != null) {
+            responseRows.addAll(rawColumnsList.stream()
+                                              .map(rawColumns -> new SamplerResponseRow(rawColumns, null, true, e.getMessage()))
+                                              .collect(Collectors.toList()));

Review comment:
       > I notice that `index.add` does not throw `ParseException` but returns the exception in its returning result. The previous version of `InputSourceSampler` checks the result to rethrow this exception, while the [change](https://github.com/apache/druid/pull/10336/files#diff-a35198e960bc90b96d559e9ea93be1d4de52655e9a4f04016173114952ae0001) introduced by #10336 deletes these exception related code, which means the code here won't throw ParseException.
   > 
   > Of course, there's possibility that the code in for-loop throws `ParseException` in future. I've made little changes to avoid this potential situation to happen. Let's check it once the CI passes.
   
   Oops, good point. I forgot what I have done in #10336.. I think `index.add()` should never throw parseException directly, but return them in `IncrementalIndexAddResult`. We can add this contract on the Javadoc of `IncrementalIndex.add()`. I suggest removing the `catch` clause here to avoid future confusion (there is no point in catching exceptions which cannot be thrown here).

##########
File path: core/src/main/java/org/apache/druid/data/input/InputRowListPlusRawValues.java
##########
@@ -82,8 +121,16 @@ private InputRowListPlusRawValues(
     return inputRows;
   }
 
+  /**
+   * This method is left here only for test cases
+   */
   @Nullable
   public Map<String, Object> getRawValues()
+  {
+    return CollectionUtils.isNullOrEmpty(rawValues) ? null : rawValues.get(0);

Review comment:
       I meant, for `rawValues.get(0)` to make sure there is only one element in there if this method is called.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-690976487


   CI reports some failures of integration tests, I'll check it later


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 edited a comment on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 edited a comment on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691386393


   @gianm @jihoonson Thanks for the suggestions. Option 2 is much easier for us to control. I would like to do it as the following. 
   
   As parameter `lineSplittable` is introduced on JsonInputFormat, to make the code easy to maintain, there will be two json reader classes named as `JsonReader` and `JsonLineReader`. The former one is the one which directly inherits `InputEntityReader` and uses `ObjectMapper.readValues` to parse input text. And `JsonLineReader` is renamed from the orignal `JsonReader` which inherits `TextReader` to process json text line by line. Both of them are instantiated by `JsonInputFormat` based on the value of `lineSplittable`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
gianm commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691239531






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
suneet-s commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691219861


   @FrankChen021 I think we should add integration tests for this. Consider updating the data files in `integration-tests/src/test/resources/data/batch_index/json/` to include multi-line json
   
   The CI failures look legit. I re-triggered the failing integration test since it looked like it might have been flaky.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 edited a comment on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 edited a comment on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691185618






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-711954685


   The branch has been rebased on master because there're some conflicts with it.
   
   @jihoonson  Here're some explanation on the latest changes
   
   1st, a new method `InputRowListPlusRawValues`.`ofList` is added to create instance of that class by passing list of raw column and rows, while the old `of` methods are still kept mainly for no changes on old code;
   
   2nd `InputRowListPlusRawValues`.`getRawValues` is still kept without involving lots of changes on test cases;
   
   3rd, there's no changes made on existing `SamplerResponse` interface. All data returned are still row by row. I think keeping this interface contract is reasonable, because its inner container `SamplerResponseRow` is already a pair of `rawColumn` and parsed json map.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r494730213



##########
File path: core/src/main/java/org/apache/druid/data/input/ExceptionThrowingIterator.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+public class ExceptionThrowingIterator<T> implements CloseableIterator<T>
+{
+  private final RuntimeException exception;
+
+  private boolean thrown = false;
+
+  public ExceptionThrowingIterator(Throwable exception)
+  {
+    this.exception = exception instanceof RuntimeException
+                     ? (RuntimeException) exception
+                     : new RuntimeException(exception);
+  }
+
+  @Override
+  public boolean hasNext()
+  {
+    return !thrown;
+  }
+
+  @Override
+  public T next()
+  {
+    thrown = true;

Review comment:
       I don't know why SpotBugs didn't report the problem before this class is extracted. But if we adopt the solution that makes `JsonReader` inherit `IntermediateRowParsingReader` as you suggest, this modification should be rollback and  I'll check it again if the report of this bug is still there




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r494728667



##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -33,13 +40,98 @@
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
-public class JsonReader extends TextReader
+/**
+ * <pre>
+ * In constract to {@link JsonLineReader} which processes input text line by line independently,
+ * this class tries to parse the input text as a whole to an array of objects.
+ *
+ * The input text can be:
+ * 1. a JSON string of an object in a line or multiple lines(such as pretty-printed JSON text)
+ * 2. multiple JSON object strings concated by white space character(s)
+ *
+ * For case 2, what should be noticed is that if an exception is thrown when parsing one JSON string,
+ * the rest JSON text will all be ignored
+ *
+ * For more information, see: https://github.com/apache/druid/pull/10383
+ * </pre>
+ */
+public class JsonReader implements InputEntityReader

Review comment:
       > The sampler currently assumes that there is only one JSON object in an input chunk which could have either an array or a nested object. 
   
   That's the root cause why `ExceptionThrowingIterator` is extracted and `JsonReader` inherits from InputEntityReader directly.
   
   Your suggestion provides a new and simple way to deal with it. I'll test the code later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r520758961



##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
##########
@@ -1163,6 +1172,121 @@ public void testIndexParseException() throws IOException
     );
   }
 
+  /**
+   *
+   * This case tests sampling for multiple json lines in one text block
+   * Currently only RecordSupplierInputSource supports this kind of input, see https://github.com/apache/druid/pull/10383 for more information
+   *
+   * This test combines illegal json block and legal json block together to verify:
+   * 1. all lines in the illegal json block should not be parsed
+   * 2. the illegal json block should not affect the processing of the 2nd record
+   * 3. all lines in legal json block should be parsed successfully
+   *
+   */
+  @Test
+  public void testMultipleJsonStringInOneBlock() throws IOException
+  {
+    if (!ParserType.STR_JSON.equals(parserType) || !useInputFormatApi) {
+      return;
+    }
+
+    final TimestampSpec timestampSpec = new TimestampSpec("t", null, null);
+    final DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        ImmutableList.of(StringDimensionSchema.create("dim1PlusBar"))
+    );
+    final TransformSpec transformSpec = new TransformSpec(
+        null,
+        ImmutableList.of(new ExpressionTransform("dim1PlusBar", "concat(dim1 + 'bar')", TestExprMacroTable.INSTANCE))
+    );
+    final AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
+    final GranularitySpec granularitySpec = new UniformGranularitySpec(
+        Granularities.DAY,
+        Granularities.HOUR,
+        true,
+        null
+    );
+    final DataSchema dataSchema = createDataSchema(
+        timestampSpec,
+        dimensionsSpec,
+        aggregatorFactories,
+        granularitySpec,
+        transformSpec
+    );
+
+    List<String> jsonBlockList = ImmutableList.of(
+        // include the line which can't be parsed into JSON object to form a illegal json block
+        String.join("", STR_JSON_ROWS),
+
+        // exclude the last line to form a legal json block
+        String.join("", STR_JSON_ROWS.stream().limit(STR_JSON_ROWS.size() - 1).collect(Collectors.toList()))

Review comment:
       > [ERROR] indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java:1221 -- Can be replaced with 'Collectors.joining'
   
   Seems like the Intellij Inspection CI doesn't like this line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r513234615



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
##########
@@ -131,17 +130,27 @@ public SamplerResponse sample(
             continue;
           }
 
-          for (InputRow row : inputRowListPlusRawValues.getInputRows()) {
-            index.add(new SamplerInputRow(row, counter), true);
+          for (int i = 0; i < rawColumnsList.size(); i++) {
+            Map<String, Object> rawColumns = rawColumnsList.get(i);
+            InputRow row = inputRowListPlusRawValues.getInputRows().get(i);
+
+            //keep the index of the row to be added to responseRows for further use
+            final int rowIndex = responseRows.size();
+            index.add(new SamplerInputRow(row, rowIndex), true);
+
             // store the raw value; will be merged with the data from the IncrementalIndex later
-            responseRows[counter] = new SamplerResponseRow(rawColumns, null, null, null);
-            counter++;
+            responseRows.add(new SamplerResponseRow(rawColumns, null, null, null));
             numRowsIndexed++;
           }
         }
         catch (ParseException e) {
-          responseRows[counter] = new SamplerResponseRow(rawColumns, null, true, e.getMessage());
-          counter++;
+          if (rawColumnsList != null) {
+            responseRows.addAll(rawColumnsList.stream()
+                                              .map(rawColumns -> new SamplerResponseRow(rawColumns, null, true, e.getMessage()))
+                                              .collect(Collectors.toList()));

Review comment:
       Got it. BTW, should we handle the exception returned in `IncrementalIndexAddResult` by adding the corresponding row to `responseRows` ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 edited a comment on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 edited a comment on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691185618






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on pull request #10383:
URL: https://github.com/apache/druid/pull/10383#issuecomment-691386393


   @gianm @jihoonson Thanks for the suggestions. Option 2 is much easier for us to control. As a parameter is introduced on JsonInputFormat, to make the code easy to maintain, there will be two json reader classes named as `JsonReader` and `JsonLineReader`. The former one is the one which directly inherits `InputEntityReader` and uses `ObjectMapper.readValues` to parse input text. And `JsonLineReader` is renamed from the orignal `JsonReader` which inherits `TextReader` to process json line by line.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10383: Fix ingestion failure of pretty-formatted JSON message

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r513646409



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
##########
@@ -131,17 +130,27 @@ public SamplerResponse sample(
             continue;
           }
 
-          for (InputRow row : inputRowListPlusRawValues.getInputRows()) {
-            index.add(new SamplerInputRow(row, counter), true);
+          for (int i = 0; i < rawColumnsList.size(); i++) {
+            Map<String, Object> rawColumns = rawColumnsList.get(i);
+            InputRow row = inputRowListPlusRawValues.getInputRows().get(i);
+
+            //keep the index of the row to be added to responseRows for further use
+            final int rowIndex = responseRows.size();
+            index.add(new SamplerInputRow(row, rowIndex), true);
+
             // store the raw value; will be merged with the data from the IncrementalIndex later
-            responseRows[counter] = new SamplerResponseRow(rawColumns, null, null, null);
-            counter++;
+            responseRows.add(new SamplerResponseRow(rawColumns, null, null, null));
             numRowsIndexed++;
           }
         }
         catch (ParseException e) {
-          responseRows[counter] = new SamplerResponseRow(rawColumns, null, true, e.getMessage());
-          counter++;
+          if (rawColumnsList != null) {
+            responseRows.addAll(rawColumnsList.stream()
+                                              .map(rawColumns -> new SamplerResponseRow(rawColumns, null, true, e.getMessage()))
+                                              .collect(Collectors.toList()));

Review comment:
       Yes, I think so because the sampler result should match to what will actually be ingested to Druid.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org