You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/09/01 01:24:29 UTC

[GitHub] [druid] jihoonson opened a new pull request #10336: More structured way to handle parse exceptions

jihoonson opened a new pull request #10336:
URL: https://github.com/apache/druid/pull/10336


   ### Description
   
   This PR has refactoring around handling `ParseExceptions` in ingestion, so that we can easily support the same logic for parallel task as well. The parseException can be thrown in 2 places on the ingestion side, i.e., when you parse the input data based on input format (in `InputEntityReader`) and when you convert the parsed object to a desired data type (in `IncrementalIndex`). To handle the exceptions in a more structured way, I added `ParseExceptionHandler` which handles all parseExceptions thrown on the ingestion side. 
   
   `FilteringCloseableInputRowIterator` was added to handle common row filters. It also has the parseExceptionHandler to handle parseExceptions thrown while parsing input data. The same parseExceptionHandler instance is passed down to `AppenderatorImpl` to handle parseExceptions thrown while converting parsed objects.
   
   <hr>
   
   This PR has:
   - [x] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `ParseExceptionHandler`
    * `FilteringCloseableInputRowIterator`
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on pull request #10336:
URL: https://github.com/apache/druid/pull/10336#issuecomment-690898961


   @jihoonson - Honestly, I haven't done a full review but +1 based on @suneet-s's review/approval. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10336:
URL: https://github.com/apache/druid/pull/10336#discussion_r484402253



##########
File path: processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
##########
@@ -227,9 +224,7 @@ protected AddToFactsResult addToFacts(
           }
           catch (ParseException e) {
             // "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
-            if (getReportParseExceptions()) {
-              throw e;
-            }
+            throw e;

Review comment:
       If we never intend to log/report this exception, we can remove the try/catch clause entirely.

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -186,7 +185,7 @@ protected AddToFactsResult addToFacts(
       } else {
         // We lost a race
         aggs = concurrentGet(prev);
-        parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
+        doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages);

Review comment:
       Currently (before and after this PR), a row that contains partially corrupted/incorrect data is aggregated anyway (partially).
   This may result in wrong statistics inference by the user due to inconsistencies between the number of rows each aggregator accumulated.
   IMO, the decision of allowing this scenario should be of the user. 
   But since currently, there is no way to roll-back aggregation, I don't see how this can be easily implemented.
   Maybe adding parse as a preliminary step to aggregation, but this is an entirely different subject.
   "return early without the second aggregation" would only aggravate this scenario, so I don't think it is the way to go either. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10336:
URL: https://github.com/apache/druid/pull/10336#discussion_r482314233



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -135,6 +148,49 @@ public void stopGracefully(TaskConfig taskConfig)
     }
   }
 
+  public static FilteringCloseableInputRowIterator inputSourceReader(
+      File tmpDir,
+      DataSchema dataSchema,
+      InputSource inputSource,
+      @Nullable InputFormat inputFormat,
+      Predicate<InputRow> rowFilter,
+      RowIngestionMeters ingestionMeters,
+      ParseExceptionHandler parseExceptionHandler
+  ) throws IOException
+  {
+    final List<String> metricsNames = Arrays.stream(dataSchema.getAggregators())
+                                            .map(AggregatorFactory::getName)
+                                            .collect(Collectors.toList());
+    final InputSourceReader inputSourceReader = dataSchema.getTransformSpec().decorate(
+        inputSource.reader(
+            new InputRowSchema(
+                dataSchema.getTimestampSpec(),
+                dataSchema.getDimensionsSpec(),
+                metricsNames
+            ),

Review comment:
       Yeah, it would be useful. Or we can change `InputRowSchema` to have all type information of metrics as well, so that we can check exceptions in parsing and type casting in only `InputEntityReader`. It means, after this change, you will not have to worry about parseExceptions in `IncrementalIndex` anymore. Probably we don't need `ParseExceptionHandler` at all in that case. I haven't done this refactoring here because it will be quite complicated and this PR is already big enough. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10336:
URL: https://github.com/apache/druid/pull/10336#issuecomment-687276293


   Ah I didn't check the code coverage. I manually checked them instead. Well, not only in `IndexTaskTest`, but there are also many tests in `KafkaIndexTaskTest` and `KinesisIndexTaskTest` which verify `processed` and `unparseable` metrics. All those tests would fail if these lines didn't work properly.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson merged pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
jihoonson merged pull request #10336:
URL: https://github.com/apache/druid/pull/10336


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #10336:
URL: https://github.com/apache/druid/pull/10336#discussion_r482150156



##########
File path: core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
##########
@@ -93,23 +94,32 @@ public static InputRow parse(
     final DateTime timestamp;
     try {
       timestamp = timestampSpec.extractTimestamp(theMap);
-      if (timestamp == null) {
-        final String input = theMap.toString();
-        throw new NullPointerException(
-            StringUtils.format(
-                "Null timestamp in input: %s",
-                input.length() < 100 ? input : input.substring(0, 100) + "..."
-            )
-        );
-      }
     }
     catch (Exception e) {
-      throw new ParseException(e, "Unparseable timestamp found! Event: %s", theMap);
+      throw new ParseException(e, "Unparseable timestamp found! Event: %s", rawMapToPrint(theMap));
+    }
+    if (timestamp == null) {
+      throw new ParseException("Unparseable timestamp found! Event: %s", rawMapToPrint(theMap));
+    }
+    if (!Intervals.ETERNITY.contains(timestamp)) {

Review comment:
       wow, I never thought of this case

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
##########
@@ -93,23 +94,32 @@ public static InputRow parse(
     final DateTime timestamp;
     try {
       timestamp = timestampSpec.extractTimestamp(theMap);
-      if (timestamp == null) {
-        final String input = theMap.toString();
-        throw new NullPointerException(
-            StringUtils.format(
-                "Null timestamp in input: %s",
-                input.length() < 100 ? input : input.substring(0, 100) + "..."
-            )
-        );
-      }
     }
     catch (Exception e) {
-      throw new ParseException(e, "Unparseable timestamp found! Event: %s", theMap);
+      throw new ParseException(e, "Unparseable timestamp found! Event: %s", rawMapToPrint(theMap));
+    }
+    if (timestamp == null) {
+      throw new ParseException("Unparseable timestamp found! Event: %s", rawMapToPrint(theMap));

Review comment:
       Instead of printing the whole map, I think it would be better to just print the timestamp string that was unparsable.
   Similar comment on line 99, this would get rid of the need of `rawMapToPrint` and guarantees that the invalid timestamp is always logged regardless of where in the event it is.
   
   ```suggestion
         throw new ParseException("Unparseable timestamp found! Timestamp column (%s): %s", timestampSpec.getTimestampColumn(), theMap.get(timestampSpec.getTimestampColumn()));
   ```

##########
File path: extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
##########
@@ -106,6 +103,9 @@
 import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
 import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
 import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;

Review comment:
       nit: Maybe these 3 classes should be moved into their own sub package `org.apache.druid.segment.incremental.stats`

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -135,6 +148,49 @@ public void stopGracefully(TaskConfig taskConfig)
     }
   }
 
+  public static FilteringCloseableInputRowIterator inputSourceReader(
+      File tmpDir,
+      DataSchema dataSchema,
+      InputSource inputSource,
+      @Nullable InputFormat inputFormat,
+      Predicate<InputRow> rowFilter,
+      RowIngestionMeters ingestionMeters,
+      ParseExceptionHandler parseExceptionHandler
+  ) throws IOException
+  {
+    final List<String> metricsNames = Arrays.stream(dataSchema.getAggregators())
+                                            .map(AggregatorFactory::getName)
+                                            .collect(Collectors.toList());
+    final InputSourceReader inputSourceReader = dataSchema.getTransformSpec().decorate(
+        inputSource.reader(
+            new InputRowSchema(
+                dataSchema.getTimestampSpec(),
+                dataSchema.getDimensionsSpec(),
+                metricsNames
+            ),

Review comment:
       This is calculated in 3 places in the code - `AbstractBatchIndexTask`, `InputSourceSampler` and `SeekableStreamIndexTaskRunner`. I think it would be a good idea to consolidate these methods, so that metricNames is always calculated the same way given a dataSchema

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
##########
@@ -93,23 +94,32 @@ public static InputRow parse(
     final DateTime timestamp;
     try {
       timestamp = timestampSpec.extractTimestamp(theMap);
-      if (timestamp == null) {
-        final String input = theMap.toString();
-        throw new NullPointerException(
-            StringUtils.format(
-                "Null timestamp in input: %s",
-                input.length() < 100 ? input : input.substring(0, 100) + "..."
-            )
-        );
-      }
     }
     catch (Exception e) {
-      throw new ParseException(e, "Unparseable timestamp found! Event: %s", theMap);
+      throw new ParseException(e, "Unparseable timestamp found! Event: %s", rawMapToPrint(theMap));
+    }
+    if (timestamp == null) {
+      throw new ParseException("Unparseable timestamp found! Event: %s", rawMapToPrint(theMap));
+    }
+    if (!Intervals.ETERNITY.contains(timestamp)) {
+      throw new ParseException(
+          "Encountered row with timestamp that cannot be represented as a long: [%s]",
+          rawMapToPrint(theMap)
+      );

Review comment:
       After your refactoring in this change, the `parse` functions on line 65 and 70 can be made private and package private (VisibleForTesting) respectively

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -135,6 +148,49 @@ public void stopGracefully(TaskConfig taskConfig)
     }
   }
 
+  public static FilteringCloseableInputRowIterator inputSourceReader(

Review comment:
       javadocs please

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
##########
@@ -151,7 +151,7 @@ private static String makeTaskId(RealtimeAppenderatorIngestionSpec spec)
   private volatile Thread runThread = null;
 
   @JsonIgnore
-  private CircularBuffer<Throwable> savedParseExceptions;
+  private ParseExceptionHandler parseExceptionHandler;

Review comment:
       Should this also be annotated with `@MonotonicNonNull`

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
##########
@@ -550,7 +546,9 @@ public Response getUnparseableEvents(
   )
   {
     IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
-    List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions);
+    List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(
+        parseExceptionHandler.getSavedParseExceptions()

Review comment:
       parseExceptionHandler can be null if the `task.run()` hasn't been called. This is probably unlikely?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10336:
URL: https://github.com/apache/druid/pull/10336#discussion_r482502260



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
##########
@@ -54,12 +64,16 @@
       @Nullable InputFormat inputFormat,
       InputRowSchema inputRowSchema,
       TransformSpec transformSpec,
-      File indexingTmpDir
+      File indexingTmpDir,
+      Predicate<InputRow> rowFilter,

Review comment:
       > I don't think we should use a Predicate<InputRow> here and in other similar places. Throwing a ParseException is expected behavior, so I think callers should know that they need to handle that exception or throw it themselves.
   >
   > It's not obvious to me how the caller in FilteringCloseableInputRowIterator should know they should handle a parse exception https://github.com/apache/druid/pull/10336/files#diff-b7f61f3d28afdd25bedf2efa607fdf88R67
   
   Hmm, I'm not sure I understand your concern. Can you elaborate more?. `rowFilter` is a simple predicate to filter out unnecessary rows and does nothing with parseExceptions. Instead, it just passes the ParseException to the caller if it is thrown in `test()`. The intention here is that the callers of `FilteringCloseableInputRowIterator` can iterate rows without worrying about validation of rows or parseExceptions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on pull request #10336:
URL: https://github.com/apache/druid/pull/10336#issuecomment-690898961






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10336:
URL: https://github.com/apache/druid/pull/10336#issuecomment-691353689


   @abhishekagarwal87 @liran-funaro thanks for the review. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10336:
URL: https://github.com/apache/druid/pull/10336#discussion_r481408656



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/ParseExceptionHandler.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.CircularBuffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * A handler for {@link ParseException}s thrown during ingestion. Based on the given configuration, this handler can
+ *
+ * - log ParseExceptions.
+ * - keep most recent N ParseExceptions in memory.
+ * - throw a RuntimeException when it sees more ParseExceptions than {@link #maxAllowedParseExceptions}.
+ *
+ * No matter what the handler does, the relevant metric should be updated first.
+ */
+public class ParseExceptionHandler
+{
+  private static final Logger LOG = new Logger(ParseExceptionHandler.class);
+
+  private final RowIngestionMeters rowIngestionMeters;
+  private final boolean logParseExceptions;
+  private final int maxAllowedParseExceptions;
+  @Nullable
+  private final CircularBuffer<Throwable> savedParseExceptions;

Review comment:
       Oh yeah, it can. I just copied existing code without noticing it. Changed it now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10336:
URL: https://github.com/apache/druid/pull/10336#discussion_r481041374



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/ParseExceptionHandler.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.CircularBuffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * A handler for {@link ParseException}s thrown during ingestion. Based on the given configuration, this handler can
+ *
+ * - log ParseExceptions.
+ * - keep most recent N ParseExceptions in memory.
+ * - throw a RuntimeException when it sees more ParseExceptions than {@link #maxAllowedParseExceptions}.
+ *
+ * No matter what the handler does, the relevant metric should be updated first.
+ */
+public class ParseExceptionHandler
+{
+  private static final Logger LOG = new Logger(ParseExceptionHandler.class);
+
+  private final RowIngestionMeters rowIngestionMeters;
+  private final boolean logParseExceptions;
+  private final int maxAllowedParseExceptions;
+  @Nullable
+  private final CircularBuffer<Throwable> savedParseExceptions;

Review comment:
       just curious, can this be `private final CircularBuffer<ParseException>` itself?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10336:
URL: https://github.com/apache/druid/pull/10336#discussion_r481041374



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/ParseExceptionHandler.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.CircularBuffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * A handler for {@link ParseException}s thrown during ingestion. Based on the given configuration, this handler can
+ *
+ * - log ParseExceptions.
+ * - keep most recent N ParseExceptions in memory.
+ * - throw a RuntimeException when it sees more ParseExceptions than {@link #maxAllowedParseExceptions}.
+ *
+ * No matter what the handler does, the relevant metric should be updated first.
+ */
+public class ParseExceptionHandler
+{
+  private static final Logger LOG = new Logger(ParseExceptionHandler.class);
+
+  private final RowIngestionMeters rowIngestionMeters;
+  private final boolean logParseExceptions;
+  private final int maxAllowedParseExceptions;
+  @Nullable
+  private final CircularBuffer<Throwable> savedParseExceptions;

Review comment:
       just curious, can this be `private final CircularBuffer<Throwable>` itself?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #10336:
URL: https://github.com/apache/druid/pull/10336#discussion_r483375527



##########
File path: server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
##########
@@ -267,6 +275,12 @@ public AppenderatorAddResult add(
       throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier);
     }
 
+    if (addResult.isRowAdded()) {
+      rowIngestionMeters.incrementProcessed();
+    } else if (addResult.hasParseException()) {
+      parseExceptionHandler.handle(addResult.getParseException());
+    }

Review comment:
       Test coverage bot is complaining about a lack of coverage for this condition. I think we should add tests for these branches. Right now `AppenderatorTest` only tests the case where `addResult.isRowAdded()` is true.
   
   Covering all 4 branches should make the code coverage bot happy too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10336:
URL: https://github.com/apache/druid/pull/10336#issuecomment-690808746


   @suneet-s thanks for the review! @abhishekagarwal87 @liran-funaro do you have more comments?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10336:
URL: https://github.com/apache/druid/pull/10336#discussion_r482501676



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -186,7 +185,7 @@ protected AddToFactsResult addToFacts(
       } else {
         // We lost a race
         aggs = concurrentGet(prev);
-        parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
+        doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages);

Review comment:
       I'm not a big fan of that either, but it seems strange to me to return exception messages as a result of aggregation. I'm actually not sure if we can return early without the second aggregation when the first aggregation throws parseExceptions. I don't want to touch that part yet at least in this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10336:
URL: https://github.com/apache/druid/pull/10336#discussion_r485069151



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -186,7 +185,7 @@ protected AddToFactsResult addToFacts(
       } else {
         // We lost a race
         aggs = concurrentGet(prev);
-        parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
+        doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages);

Review comment:
       Good point. I'm not sure if that behaviour is intended or not, but seems strange to me. I think all parse operations should be done in `InputSourceReader.read()` as a preliminary step so that we don't have to worry about ParseExceptions in IncrementalIndex. But yeah, this is a different subject.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson merged pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
jihoonson merged pull request #10336:
URL: https://github.com/apache/druid/pull/10336






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10336:
URL: https://github.com/apache/druid/pull/10336#issuecomment-691353689






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10336:
URL: https://github.com/apache/druid/pull/10336#discussion_r482325301



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
##########
@@ -59,7 +65,9 @@ public static Appenderator newAppenderator(
       TaskToolbox toolbox,
       DataSchema dataSchema,
       AppenderatorConfig appenderatorConfig,
-      DataSegmentPusher segmentPusher
+      DataSegmentPusher segmentPusher,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler

Review comment:
       fwiw, I think current implementation is better one. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10336:
URL: https://github.com/apache/druid/pull/10336#issuecomment-686045841


   > In general, a common question I've asked myself while reviewing this patch is are there any places where we now parse more rows vs failing earlier in the process by directly throwing an exception. It was hard for me to discern that reading this patch. Do you have any suggestions on how to look for cases where an exception is being swallowed instead of being thrown immediately?
   
   I mainly touched Kafka/Kinesis tasks and IndexTask and we have some tests for them which verify the metrics in `RowIngestionMeters`. Check out `IndexTaskTest`, `KafkaIndexTaskTest`, and `KinesisIndexTaskTest`. So I believe there is no bug in at least what are covered by those tests. 
   
   One of the goals in this PR is clarifying where `ParseException` can be thrown. I first tried to change `ParseException` to be a regular `Exception` instead of `RuntimeException`, but it requires to modify all method signatures used on the query side even though `ParseException` will not be thrown from them. Since this doesn't look nice, I decided to scope down the places where can potentially throw `ParseException`s. After this PR, the only possible places are `InputEntityReader` (and all classes it uses) and `IncrementalIndex.addToFacts()` and any `ParseException` thrown in other places is a bug.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on pull request #10336:
URL: https://github.com/apache/druid/pull/10336#issuecomment-690952441






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10336:
URL: https://github.com/apache/druid/pull/10336#issuecomment-691353689






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10336:
URL: https://github.com/apache/druid/pull/10336#issuecomment-689914438


   It turns out that I can easily add a new test for missing branches, so I added one. Hopefully the bot is happy now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on pull request #10336:
URL: https://github.com/apache/druid/pull/10336#issuecomment-690898961


   @jihoonson - Honestly, I haven't done a full review but +1 based on @suneet-s's review/approval. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #10336:
URL: https://github.com/apache/druid/pull/10336#discussion_r483375527



##########
File path: server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
##########
@@ -267,6 +275,12 @@ public AppenderatorAddResult add(
       throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier);
     }
 
+    if (addResult.isRowAdded()) {
+      rowIngestionMeters.incrementProcessed();
+    } else if (addResult.hasParseException()) {
+      parseExceptionHandler.handle(addResult.getParseException());
+    }

Review comment:
       Test coverage bot is complaining about a lack of coverage for this condition. I think we should add tests for these branches. Right now `AppenderatorTest` only tests `addResult.isRowAdded`
   
   Covering all 4 branches should make the code coverage bot happy too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10336:
URL: https://github.com/apache/druid/pull/10336#issuecomment-685164426


   ```278 L | B(1/2)        if (addResult.isRowAdded()) {
   279 L                   rowIngestionMeters.incrementProcessed();
   280 F | L | B(0/2)    } else if (addResult.hasParseException()) {
   281 L                   parseExceptionHandler.handle(addResult.getParseException());
   282 F                 }
   ```
   
   The test coverage bot is failing because of the missing tests for these branches, but they are being covered by `IndexTaskTest` which is in `indexing-service`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #10336:
URL: https://github.com/apache/druid/pull/10336#discussion_r482408249



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -186,7 +185,7 @@ protected AddToFactsResult addToFacts(
       } else {
         // We lost a race
         aggs = concurrentGet(prev);
-        parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
+        doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages);

Review comment:
       why not just return a List<String> like before? That seems less risky than assuming the list is mutated correctly. In general, I'm not a big fan of returning values from a function using parameters because of cases like this.
   
   Either way, this comment shouldn't be considered a blocker, just my 2 cents

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
##########
@@ -54,12 +64,16 @@
       @Nullable InputFormat inputFormat,
       InputRowSchema inputRowSchema,
       TransformSpec transformSpec,
-      File indexingTmpDir
+      File indexingTmpDir,
+      Predicate<InputRow> rowFilter,

Review comment:
       I don't think we should use a `Predicate<InputRow>` here and in other similar places. Throwing a `ParseException` is expected behavior, so I think callers should know that they need to handle that exception or throw it themselves.
   
   It's not obvious to me how the caller in `FilteringCloseableInputRowIterator` should know they should handle a parse exception https://github.com/apache/druid/pull/10336/files#diff-b7f61f3d28afdd25bedf2efa607fdf88R67
   
   How can I check that the ParseException is handled at the correct level everywhere? I realize this is challenging because ParseException is a RuntimeException.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10336:
URL: https://github.com/apache/druid/pull/10336#discussion_r485070100



##########
File path: processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
##########
@@ -227,9 +224,7 @@ protected AddToFactsResult addToFacts(
           }
           catch (ParseException e) {
             // "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
-            if (getReportParseExceptions()) {
-              throw e;
-            }
+            throw e;

Review comment:
       Good catch. Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #10336:
URL: https://github.com/apache/druid/pull/10336#discussion_r482204639



##########
File path: server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
##########
@@ -129,7 +137,9 @@ public Appenderator createOfflineAppenderatorForTask(
           dataSegmentPusher,
           objectMapper,
           indexIO,
-          indexMerger
+          indexMerger,
+          rowIngestionMeters,
+          parseExceptionHandler

Review comment:
       General comment here for all the new functions that accept `parseExceptionHandler`. `parseExceptionHandler` tends to be lazily initialized throughout the codebase, but I don't see safeguards against using an un-initialized parseExceptionHandler. Perhaps more use of `@Nullable` annotation will help surface potential places where it may be used before it's initialized

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
##########
@@ -59,7 +65,9 @@ public static Appenderator newAppenderator(
       TaskToolbox toolbox,
       DataSchema dataSchema,
       AppenderatorConfig appenderatorConfig,
-      DataSegmentPusher segmentPusher
+      DataSegmentPusher segmentPusher,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler

Review comment:
       Is there ever a case where the `rowIngestionMeters` is different than the one used in `parseExceptionHandler`? If not, I think we should just pass in the parseExceptionHandler.
   
   Similar comment on line 45

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class NoopRowIngestionMeters implements RowIngestionMeters

Review comment:
       Javadocs please.
   
   Should this just be a singleton?

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -186,7 +185,7 @@ protected AddToFactsResult addToFacts(
       } else {
         // We lost a race
         aggs = concurrentGet(prev);
-        parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
+        doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages);

Review comment:
       This is a change in behavior. parseExceptionMessages will add to the parseExceptionMessages that were found on line 165. Previously it was re-set. Was this intentional?

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -233,16 +232,13 @@ protected AddToFactsResult addToFacts(
         }
         catch (ParseException e) {
           // "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
-          if (getReportParseExceptions()) {
-            throw new ParseException(e, "Encountered parse error for aggregator[%s]", getMetricAggs()[i].getName());
-          } else {
-            log.debug(e, "Encountered parse error, skipping aggregator[%s].", getMetricAggs()[i].getName());
-          }
+          log.debug(e, "Encountered parse error, skipping aggregator[%s].", getMetricAggs()[i].getName());
+          parseExceptionMessages.add(e.getMessage());

Review comment:
       This is a change in behavior. Previously the aggs would fail fast, but now it tries to parse all of them before bubbling up all the parse exceptions. Was this change intentional? I see a similar change in at least the Onheap implementation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10336:
URL: https://github.com/apache/druid/pull/10336#discussion_r482313598



##########
File path: core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
##########
@@ -93,23 +94,32 @@ public static InputRow parse(
     final DateTime timestamp;
     try {
       timestamp = timestampSpec.extractTimestamp(theMap);
-      if (timestamp == null) {
-        final String input = theMap.toString();
-        throw new NullPointerException(
-            StringUtils.format(
-                "Null timestamp in input: %s",
-                input.length() < 100 ? input : input.substring(0, 100) + "..."
-            )
-        );
-      }
     }
     catch (Exception e) {
-      throw new ParseException(e, "Unparseable timestamp found! Event: %s", theMap);
+      throw new ParseException(e, "Unparseable timestamp found! Event: %s", rawMapToPrint(theMap));
+    }
+    if (timestamp == null) {
+      throw new ParseException("Unparseable timestamp found! Event: %s", rawMapToPrint(theMap));

Review comment:
       It is a good idea to print the timestamp string, but I would like to keep the current behavior as well (this logging is not what I added in this PR). I modified the error message to include timestamp.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
##########
@@ -93,23 +94,32 @@ public static InputRow parse(
     final DateTime timestamp;
     try {
       timestamp = timestampSpec.extractTimestamp(theMap);
-      if (timestamp == null) {
-        final String input = theMap.toString();
-        throw new NullPointerException(
-            StringUtils.format(
-                "Null timestamp in input: %s",
-                input.length() < 100 ? input : input.substring(0, 100) + "..."
-            )
-        );
-      }
     }
     catch (Exception e) {
-      throw new ParseException(e, "Unparseable timestamp found! Event: %s", theMap);
+      throw new ParseException(e, "Unparseable timestamp found! Event: %s", rawMapToPrint(theMap));
+    }
+    if (timestamp == null) {
+      throw new ParseException("Unparseable timestamp found! Event: %s", rawMapToPrint(theMap));
+    }
+    if (!Intervals.ETERNITY.contains(timestamp)) {
+      throw new ParseException(
+          "Encountered row with timestamp that cannot be represented as a long: [%s]",
+          rawMapToPrint(theMap)
+      );

Review comment:
       Done.

##########
File path: extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
##########
@@ -106,6 +103,9 @@
 import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
 import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
 import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;

Review comment:
       That might be nice. I think we probably need new classes for metrics of native batch ingestion. I will reorganize the package in a follow-up PR if is good.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -135,6 +148,49 @@ public void stopGracefully(TaskConfig taskConfig)
     }
   }
 
+  public static FilteringCloseableInputRowIterator inputSourceReader(

Review comment:
       Added.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
##########
@@ -151,7 +151,7 @@ private static String makeTaskId(RealtimeAppenderatorIngestionSpec spec)
   private volatile Thread runThread = null;
 
   @JsonIgnore
-  private CircularBuffer<Throwable> savedParseExceptions;
+  private ParseExceptionHandler parseExceptionHandler;

Review comment:
       Added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on pull request #10336:
URL: https://github.com/apache/druid/pull/10336#issuecomment-690952441


   @jihoonson LGTM.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10336:
URL: https://github.com/apache/druid/pull/10336#discussion_r482314355



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
##########
@@ -550,7 +546,9 @@ public Response getUnparseableEvents(
   )
   {
     IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
-    List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions);
+    List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(
+        parseExceptionHandler.getSavedParseExceptions()

Review comment:
       This is fine. Any APIs cannot be called until `chatHandler` is registered. The `chatHandler` is registered in `run()` after `parseExceptionHandler` is created.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10336:
URL: https://github.com/apache/druid/pull/10336#discussion_r482314655



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
##########
@@ -59,7 +65,9 @@ public static Appenderator newAppenderator(
       TaskToolbox toolbox,
       DataSchema dataSchema,
       AppenderatorConfig appenderatorConfig,
-      DataSegmentPusher segmentPusher
+      DataSegmentPusher segmentPusher,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler

Review comment:
       I see your point, but I'm not sure if it's good to have `parseExceptionHandler` be a `rowIngestionMeters` provider. I guess we have a couple of options for alternatives:
   
   - Merge `RowIngestioMeters` and `ParseExceptionHandler`. `RowIngestionMeters` will have `handle(ParseException)` method and throw an exception when it hits the limit. This seems a bit strange to me since metrics registry can throw an exception.
   - Make `RowIngestionMeters` to be only a registry and use individual `Meter`s. For example, `ParseExceptionHandler` will have only a `Meter` for counting unparseable events. `AppenderatorImpl` will have only a `Meter` for counting processed events. This seems sane, but can make code error-prone since we will have multiple `Meter`s which should be properly chosen whenever we use them.
   
   I'm not sure any of these are better than now. Do you have any better idea?

##########
File path: server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
##########
@@ -129,7 +137,9 @@ public Appenderator createOfflineAppenderatorForTask(
           dataSegmentPusher,
           objectMapper,
           indexIO,
-          indexMerger
+          indexMerger,
+          rowIngestionMeters,
+          parseExceptionHandler

Review comment:
       They must not be null. I added null checks in `AppenderatorImpl`.

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -186,7 +185,7 @@ protected AddToFactsResult addToFacts(
       } else {
         // We lost a race
         aggs = concurrentGet(prev);
-        parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
+        doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages);

Review comment:
       Nice finding. Clearing parseExceptionMessages now.

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -233,16 +232,13 @@ protected AddToFactsResult addToFacts(
         }
         catch (ParseException e) {
           // "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
-          if (getReportParseExceptions()) {
-            throw new ParseException(e, "Encountered parse error for aggregator[%s]", getMetricAggs()[i].getName());
-          } else {
-            log.debug(e, "Encountered parse error, skipping aggregator[%s].", getMetricAggs()[i].getName());
-          }
+          log.debug(e, "Encountered parse error, skipping aggregator[%s].", getMetricAggs()[i].getName());
+          parseExceptionMessages.add(e.getMessage());

Review comment:
       This is intentional to make it same with `OnheapIncrementalIndex`.

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class NoopRowIngestionMeters implements RowIngestionMeters

Review comment:
       This is used only in `RealtimeIndexTask` which is used by `Tranquility`. It can be a singleton, but doesn't seem matter.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10336:
URL: https://github.com/apache/druid/pull/10336#issuecomment-691353689


   @abhishekagarwal87 @liran-funaro thanks for the review. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
suneet-s commented on pull request #10336:
URL: https://github.com/apache/druid/pull/10336#issuecomment-687199069


   > ```
   > 279 L                   rowIngestionMeters.incrementProcessed();
   > 280 F | L | B(0/2)    } else if (addResult.hasParseException()) {
   > 281 L                   parseExceptionHandler.handle(addResult.getParseException());
   > 282 F                 }
   > ```
   > 
   > The test coverage bot is failing because of the missing tests for these branches, but they are being covered by `IndexTaskTest` which is in `indexing-service`.
   
   Oops sorry about my previous comment. How did you get code coverage to show you that these branches were covered? I tried running this locally, but it doesn't show me code coverage. This is what I see in the `indexing modules` tests
   
   ```
   Diff coverage statistics:
   ------------------------------------------------------------------------------
   |     lines      |    branches    |   functions    |   path
   ------------------------------------------------------------------------------
   | 100% (1/1)     | 100% (0/0)     | 100% (1/1)     | org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java
   | 100% (6/6)     | 100% (0/0)     | 100% (7/7)     | org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
   | 100% (18/18)   | 100% (4/4)     | 100% (16/16)   | org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
   |  80% (25/31)   |  70% (7/10)    |  93% (27/29)   | org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
   |   0% (0/45)    |   0% (0/16)    |  25% (2/8)     | org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
   |   0% (0/1)     | 100% (0/0)     | 100% (1/1)     | org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
   |  88% (15/17)   |  66% (4/6)     |  48% (12/25)   | org/apache/druid/indexing/seekablestream/StreamChunkParser.java
   | 100% (14/14)   | 100% (2/2)     |  85% (24/28)   | org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
   |  89% (17/19)   |  75% (6/8)     |  89% (33/37)   | org/apache/druid/indexing/common/task/InputSourceProcessor.java
   | 100% (2/2)     | 100% (0/0)     | 100% (11/11)   | org/apache/druid/indexing/common/task/BatchAppenderators.java
   |  90% (36/40)   |  85% (12/14)   |  83% (10/12)   | org/apache/druid/indexing/common/task/IndexTask.java
   |  95% (21/22)   |  90% (9/10)    |  94% (51/54)   | org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java
   |  80% (8/10)    |  50% (1/2)     |  80% (8/10)    | org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
   | 100% (4/4)     | 100% (0/0)     |   0% (0/1)     | org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
   ------------------------------------------------------------------------------
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on pull request #10336: More structured way to handle parse exceptions

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on pull request #10336:
URL: https://github.com/apache/druid/pull/10336#issuecomment-690952441


   @jihoonson LGTM.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org