You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by su...@apache.org on 2021/08/09 19:41:08 UTC

[druid] branch master updated: fix parse exception handling for stream parsers (#11556)

This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f2ac6cd  fix parse exception handling for stream parsers (#11556)
f2ac6cd is described below

commit f2ac6cd96edfacca6ba7880d5eb9c532ea0b95f2
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Mon Aug 9 12:40:44 2021 -0700

    fix parse exception handling for stream parsers (#11556)
    
    * fix parse exception handling
    
    * fix style and inspections
---
 .../task/FilteringCloseableInputRowIterator.java   |  21 +--
 .../FilteringCloseableInputRowIteratorTest.java    | 147 ++++++++++++++++-----
 2 files changed, 124 insertions(+), 44 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java
index 6a88a34..30af8fe 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java
@@ -59,16 +59,21 @@ public class FilteringCloseableInputRowIterator implements CloseableIterator<Inp
   @Override
   public boolean hasNext()
   {
-    while (next == null && delegate.hasNext()) {
+    while (true) {
       try {
-        // delegate.next() can throw ParseException
-        final InputRow row = delegate.next();
-        // filter.test() can throw ParseException
-        if (filter.test(row)) {
-          next = row;
-        } else {
-          rowIngestionMeters.incrementThrownAway();
+        // delegate.hasNext() can throw ParseException, since some types of delegating iterators will call next on
+        // their underlying iterator
+        while (next == null && delegate.hasNext()) {
+          // delegate.next() can throw ParseException
+          final InputRow row = delegate.next();
+          // filter.test() can throw ParseException
+          if (filter.test(row)) {
+            next = row;
+          } else {
+            rowIngestionMeters.incrementThrownAway();
+          }
         }
+        break;
       }
       catch (ParseException e) {
         parseExceptionHandler.handle(e);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java
index b26c736..f6c4299 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java
@@ -46,6 +46,14 @@ import java.util.stream.Collectors;
 public class FilteringCloseableInputRowIteratorTest
 {
   private static final List<String> DIMENSIONS = ImmutableList.of("dim1", "dim2");
+  private static final List<InputRow> ROWS = ImmutableList.of(
+      newRow(DateTimes.of("2020-01-01"), 10, 200),
+      newRow(DateTimes.of("2020-01-01"), 10, 400),
+      newRow(DateTimes.of("2020-01-01"), 20, 400),
+      newRow(DateTimes.of("2020-01-01"), 10, 800),
+      newRow(DateTimes.of("2020-01-01"), 30, 200),
+      newRow(DateTimes.of("2020-01-01"), 10, 300)
+  );
 
   private RowIngestionMeters rowIngestionMeters;
   private ParseExceptionHandler parseExceptionHandler;
@@ -65,17 +73,9 @@ public class FilteringCloseableInputRowIteratorTest
   @Test
   public void testFilterOutRows()
   {
-    final List<InputRow> rows = ImmutableList.of(
-        newRow(DateTimes.of("2020-01-01"), 10, 200),
-        newRow(DateTimes.of("2020-01-01"), 10, 400),
-        newRow(DateTimes.of("2020-01-01"), 20, 400),
-        newRow(DateTimes.of("2020-01-01"), 10, 800),
-        newRow(DateTimes.of("2020-01-01"), 30, 200),
-        newRow(DateTimes.of("2020-01-01"), 10, 300)
-    );
     final Predicate<InputRow> filter = row -> (Integer) row.getRaw("dim1") == 10;
     final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
-        CloseableIterators.withEmptyBaggage(rows.iterator()),
+        CloseableIterators.withEmptyBaggage(ROWS.iterator()),
         filter,
         rowIngestionMeters,
         parseExceptionHandler
@@ -83,7 +83,7 @@ public class FilteringCloseableInputRowIteratorTest
     final List<InputRow> filteredRows = new ArrayList<>();
     rowIterator.forEachRemaining(filteredRows::add);
     Assert.assertEquals(
-        rows.stream().filter(filter).collect(Collectors.toList()),
+        ROWS.stream().filter(filter).collect(Collectors.toList()),
         filteredRows
     );
     Assert.assertEquals(2, rowIngestionMeters.getThrownAway());
@@ -92,19 +92,10 @@ public class FilteringCloseableInputRowIteratorTest
   @Test
   public void testParseExceptionInDelegateNext()
   {
-    final List<InputRow> rows = ImmutableList.of(
-        newRow(DateTimes.of("2020-01-01"), 10, 200),
-        newRow(DateTimes.of("2020-01-01"), 10, 400),
-        newRow(DateTimes.of("2020-01-01"), 20, 400),
-        newRow(DateTimes.of("2020-01-01"), 10, 800),
-        newRow(DateTimes.of("2020-01-01"), 30, 200),
-        newRow(DateTimes.of("2020-01-01"), 10, 300)
-    );
-
     // This iterator throws ParseException every other call to next().
     final CloseableIterator<InputRow> parseExceptionThrowingIterator = new CloseableIterator<InputRow>()
     {
-      final int numRowsToIterate = rows.size() * 2;
+      final int numRowsToIterate = ROWS.size() * 2;
       int nextIdx = 0;
 
       @Override
@@ -118,7 +109,7 @@ public class FilteringCloseableInputRowIteratorTest
       {
         final int currentIdx = nextIdx++;
         if (currentIdx % 2 == 0) {
-          return rows.get(currentIdx / 2);
+          return ROWS.get(currentIdx / 2);
         } else {
           throw new ParseException("Parse exception at ", currentIdx);
         }
@@ -139,24 +130,16 @@ public class FilteringCloseableInputRowIteratorTest
 
     final List<InputRow> filteredRows = new ArrayList<>();
     rowIterator.forEachRemaining(filteredRows::add);
-    Assert.assertEquals(rows, filteredRows);
-    Assert.assertEquals(rows.size(), rowIngestionMeters.getUnparseable());
+    Assert.assertEquals(ROWS, filteredRows);
+    Assert.assertEquals(ROWS.size(), rowIngestionMeters.getUnparseable());
   }
 
   @Test
   public void testParseExceptionInPredicateTest()
   {
-    final List<InputRow> rows = ImmutableList.of(
-        newRow(DateTimes.of("2020-01-01"), 10, 200),
-        newRow(DateTimes.of("2020-01-01"), 10, 400),
-        newRow(DateTimes.of("2020-01-01"), 20, 400),
-        newRow(DateTimes.of("2020-01-01"), 10, 800),
-        newRow(DateTimes.of("2020-01-01"), 30, 200),
-        newRow(DateTimes.of("2020-01-01"), 10, 300)
-    );
 
     final CloseableIterator<InputRow> parseExceptionThrowingIterator = CloseableIterators.withEmptyBaggage(
-        rows.iterator()
+        ROWS.iterator()
     );
     // This filter throws ParseException every other call to test().
     final Predicate<InputRow> filter = new Predicate<InputRow>()
@@ -186,12 +169,104 @@ public class FilteringCloseableInputRowIteratorTest
     final List<InputRow> filteredRows = new ArrayList<>();
     rowIterator.forEachRemaining(filteredRows::add);
     final List<InputRow> expectedRows = ImmutableList.of(
-        rows.get(0),
-        rows.get(2),
-        rows.get(4)
+        ROWS.get(0),
+        ROWS.get(2),
+        ROWS.get(4)
     );
     Assert.assertEquals(expectedRows, filteredRows);
-    Assert.assertEquals(rows.size() - expectedRows.size(), rowIngestionMeters.getUnparseable());
+    Assert.assertEquals(ROWS.size() - expectedRows.size(), rowIngestionMeters.getUnparseable());
+  }
+
+  @Test
+  public void testParseExceptionInDelegateHasNext()
+  {
+    // This iterator throws ParseException every other call to hasNext().
+    final CloseableIterator<InputRow> parseExceptionThrowingIterator = new CloseableIterator<InputRow>()
+    {
+      final int numRowsToIterate = ROWS.size() * 2;
+      int currentIndex = 0;
+      int nextIndex = 0;
+
+      @Override
+      public boolean hasNext()
+      {
+        currentIndex = nextIndex++;
+        if (currentIndex % 2 == 0) {
+          return currentIndex < numRowsToIterate;
+        } else {
+          throw new ParseException("Parse exception at ", currentIndex);
+        }
+      }
+
+      @Override
+      public InputRow next()
+      {
+        return ROWS.get(currentIndex / 2);
+      }
+
+      @Override
+      public void close()
+      {
+      }
+    };
+
+    final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
+        parseExceptionThrowingIterator,
+        row -> true,
+        rowIngestionMeters,
+        parseExceptionHandler
+    );
+
+    final List<InputRow> filteredRows = new ArrayList<>();
+    rowIterator.forEachRemaining(filteredRows::add);
+    Assert.assertEquals(ROWS, filteredRows);
+    Assert.assertEquals(ROWS.size(), rowIngestionMeters.getUnparseable());
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testNonParseExceptionInDelegateHasNext()
+  {
+    // This iterator throws ParseException every other call to hasNext().
+    final CloseableIterator<InputRow> parseExceptionThrowingIterator = new CloseableIterator<InputRow>()
+    {
+      final int numRowsToIterate = ROWS.size() * 2;
+      int currentIndex = 0;
+      int nextIndex = 0;
+
+      @Override
+      public boolean hasNext()
+      {
+        currentIndex = nextIndex++;
+        if (currentIndex % 2 == 0) {
+          return currentIndex < numRowsToIterate;
+        } else {
+          throw new RuntimeException("should explode");
+        }
+      }
+
+      @Override
+      public InputRow next()
+      {
+        return ROWS.get(currentIndex / 2);
+      }
+
+      @Override
+      public void close()
+      {
+      }
+    };
+
+    final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
+        parseExceptionThrowingIterator,
+        row -> true,
+        rowIngestionMeters,
+        parseExceptionHandler
+    );
+
+    while (rowIterator.hasNext()) {
+      rowIterator.next();
+    }
+    Assert.fail("you never should have come here");
   }
 
   @Test

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org