You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/11/09 18:48:33 UTC

[GitHub] [druid] zachjsh opened a new pull request #11898: Can reject row if parsing error

zachjsh opened a new pull request #11898:
URL: https://github.com/apache/druid/pull/11898


   ### Description
   
   Added a config, `rejectRowIfParsingError` parameter to tuningConfig, which allows a user to specify that any rows that are found during ingestion to have parsing errors, should be thrown away, instead of the existing behavior of retaining the row, but filling the column(s) which caused parsing error to be null value. The new parameter is disabled by default, which will retain the existing behavior.
   
   This PR has:
   - [x] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [x] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] zachjsh commented on pull request #11898: Can reject row if parsing error

Posted by GitBox <gi...@apache.org>.
zachjsh commented on pull request #11898:
URL: https://github.com/apache/druid/pull/11898#issuecomment-964514311


   Note: still need to add some more tests, docs, and javadocs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #11898: Can reject row if parsing error

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #11898:
URL: https://github.com/apache/druid/pull/11898#discussion_r798378297



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -150,26 +153,36 @@ protected void initAggs(
   @Override
   protected AddToFactsResult addToFacts(
       InputRow row,
-      IncrementalIndexRow key,
+      IncrementalIndexRowResult incrementalIndexRowResult,
       ThreadLocal<InputRow> rowContainer,
       Supplier<InputRow> rowSupplier,
       boolean skipMaxRowsInMemoryCheck
   ) throws IndexSizeExceededException
   {
+    IncrementalIndexRow key = incrementalIndexRowResult.getIncrementalIndexRow();
     final List<String> parseExceptionMessages = new ArrayList<>();
     final int priorIndex = facts.getPriorIndex(key);
 
     Aggregator[] aggs;
     final AggregatorFactory[] metrics = getMetrics();
     final AtomicInteger numEntries = getNumEntries();
     final AtomicLong sizeInBytes = getBytesInMemory();
+    if (shouldRowBeRejected(incrementalIndexRowResult.getParseExceptionMessages().size())) {
+      return new AddToFactsResult(numEntries.get(), sizeInBytes.get(), parseExceptionMessages);
+    }
     if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) {
       aggs = concurrentGet(priorIndex);
       doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages);
+      if (shouldRowBeRejected(parseExceptionMessages.size())) {
+        return new AddToFactsResult(numEntries.get(), sizeInBytes.get(), parseExceptionMessages);
+      }

Review comment:
       Isn't it too late to reject this row at this point? The row was already aggregated to the existing one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org