You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by su...@apache.org on 2021/08/09 19:41:08 UTC
[druid] branch master updated: fix parse exception handling for
stream parsers (#11556)
This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f2ac6cd fix parse exception handling for stream parsers (#11556)
f2ac6cd is described below
commit f2ac6cd96edfacca6ba7880d5eb9c532ea0b95f2
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Mon Aug 9 12:40:44 2021 -0700
fix parse exception handling for stream parsers (#11556)
* fix parse exception handling
* fix style and inspections
---
.../task/FilteringCloseableInputRowIterator.java | 21 +--
.../FilteringCloseableInputRowIteratorTest.java | 147 ++++++++++++++++-----
2 files changed, 124 insertions(+), 44 deletions(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java
index 6a88a34..30af8fe 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java
@@ -59,16 +59,21 @@ public class FilteringCloseableInputRowIterator implements CloseableIterator<Inp
@Override
public boolean hasNext()
{
- while (next == null && delegate.hasNext()) {
+ while (true) {
try {
- // delegate.next() can throw ParseException
- final InputRow row = delegate.next();
- // filter.test() can throw ParseException
- if (filter.test(row)) {
- next = row;
- } else {
- rowIngestionMeters.incrementThrownAway();
+ // delegate.hasNext() can throw ParseException, since some types of delegating iterators will call next on
+ // their underlying iterator
+ while (next == null && delegate.hasNext()) {
+ // delegate.next() can throw ParseException
+ final InputRow row = delegate.next();
+ // filter.test() can throw ParseException
+ if (filter.test(row)) {
+ next = row;
+ } else {
+ rowIngestionMeters.incrementThrownAway();
+ }
}
+ break;
}
catch (ParseException e) {
parseExceptionHandler.handle(e);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java
index b26c736..f6c4299 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java
@@ -46,6 +46,14 @@ import java.util.stream.Collectors;
public class FilteringCloseableInputRowIteratorTest
{
private static final List<String> DIMENSIONS = ImmutableList.of("dim1", "dim2");
+ private static final List<InputRow> ROWS = ImmutableList.of(
+ newRow(DateTimes.of("2020-01-01"), 10, 200),
+ newRow(DateTimes.of("2020-01-01"), 10, 400),
+ newRow(DateTimes.of("2020-01-01"), 20, 400),
+ newRow(DateTimes.of("2020-01-01"), 10, 800),
+ newRow(DateTimes.of("2020-01-01"), 30, 200),
+ newRow(DateTimes.of("2020-01-01"), 10, 300)
+ );
private RowIngestionMeters rowIngestionMeters;
private ParseExceptionHandler parseExceptionHandler;
@@ -65,17 +73,9 @@ public class FilteringCloseableInputRowIteratorTest
@Test
public void testFilterOutRows()
{
- final List<InputRow> rows = ImmutableList.of(
- newRow(DateTimes.of("2020-01-01"), 10, 200),
- newRow(DateTimes.of("2020-01-01"), 10, 400),
- newRow(DateTimes.of("2020-01-01"), 20, 400),
- newRow(DateTimes.of("2020-01-01"), 10, 800),
- newRow(DateTimes.of("2020-01-01"), 30, 200),
- newRow(DateTimes.of("2020-01-01"), 10, 300)
- );
final Predicate<InputRow> filter = row -> (Integer) row.getRaw("dim1") == 10;
final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
- CloseableIterators.withEmptyBaggage(rows.iterator()),
+ CloseableIterators.withEmptyBaggage(ROWS.iterator()),
filter,
rowIngestionMeters,
parseExceptionHandler
@@ -83,7 +83,7 @@ public class FilteringCloseableInputRowIteratorTest
final List<InputRow> filteredRows = new ArrayList<>();
rowIterator.forEachRemaining(filteredRows::add);
Assert.assertEquals(
- rows.stream().filter(filter).collect(Collectors.toList()),
+ ROWS.stream().filter(filter).collect(Collectors.toList()),
filteredRows
);
Assert.assertEquals(2, rowIngestionMeters.getThrownAway());
@@ -92,19 +92,10 @@ public class FilteringCloseableInputRowIteratorTest
@Test
public void testParseExceptionInDelegateNext()
{
- final List<InputRow> rows = ImmutableList.of(
- newRow(DateTimes.of("2020-01-01"), 10, 200),
- newRow(DateTimes.of("2020-01-01"), 10, 400),
- newRow(DateTimes.of("2020-01-01"), 20, 400),
- newRow(DateTimes.of("2020-01-01"), 10, 800),
- newRow(DateTimes.of("2020-01-01"), 30, 200),
- newRow(DateTimes.of("2020-01-01"), 10, 300)
- );
-
// This iterator throws ParseException every other call to next().
final CloseableIterator<InputRow> parseExceptionThrowingIterator = new CloseableIterator<InputRow>()
{
- final int numRowsToIterate = rows.size() * 2;
+ final int numRowsToIterate = ROWS.size() * 2;
int nextIdx = 0;
@Override
@@ -118,7 +109,7 @@ public class FilteringCloseableInputRowIteratorTest
{
final int currentIdx = nextIdx++;
if (currentIdx % 2 == 0) {
- return rows.get(currentIdx / 2);
+ return ROWS.get(currentIdx / 2);
} else {
throw new ParseException("Parse exception at ", currentIdx);
}
@@ -139,24 +130,16 @@ public class FilteringCloseableInputRowIteratorTest
final List<InputRow> filteredRows = new ArrayList<>();
rowIterator.forEachRemaining(filteredRows::add);
- Assert.assertEquals(rows, filteredRows);
- Assert.assertEquals(rows.size(), rowIngestionMeters.getUnparseable());
+ Assert.assertEquals(ROWS, filteredRows);
+ Assert.assertEquals(ROWS.size(), rowIngestionMeters.getUnparseable());
}
@Test
public void testParseExceptionInPredicateTest()
{
- final List<InputRow> rows = ImmutableList.of(
- newRow(DateTimes.of("2020-01-01"), 10, 200),
- newRow(DateTimes.of("2020-01-01"), 10, 400),
- newRow(DateTimes.of("2020-01-01"), 20, 400),
- newRow(DateTimes.of("2020-01-01"), 10, 800),
- newRow(DateTimes.of("2020-01-01"), 30, 200),
- newRow(DateTimes.of("2020-01-01"), 10, 300)
- );
final CloseableIterator<InputRow> parseExceptionThrowingIterator = CloseableIterators.withEmptyBaggage(
- rows.iterator()
+ ROWS.iterator()
);
// This filter throws ParseException every other call to test().
final Predicate<InputRow> filter = new Predicate<InputRow>()
@@ -186,12 +169,104 @@ public class FilteringCloseableInputRowIteratorTest
final List<InputRow> filteredRows = new ArrayList<>();
rowIterator.forEachRemaining(filteredRows::add);
final List<InputRow> expectedRows = ImmutableList.of(
- rows.get(0),
- rows.get(2),
- rows.get(4)
+ ROWS.get(0),
+ ROWS.get(2),
+ ROWS.get(4)
);
Assert.assertEquals(expectedRows, filteredRows);
- Assert.assertEquals(rows.size() - expectedRows.size(), rowIngestionMeters.getUnparseable());
+ Assert.assertEquals(ROWS.size() - expectedRows.size(), rowIngestionMeters.getUnparseable());
+ }
+
+ @Test
+ public void testParseExceptionInDelegateHasNext()
+ {
+ // This iterator throws ParseException every other call to hasNext().
+ final CloseableIterator<InputRow> parseExceptionThrowingIterator = new CloseableIterator<InputRow>()
+ {
+ final int numRowsToIterate = ROWS.size() * 2;
+ int currentIndex = 0;
+ int nextIndex = 0;
+
+ @Override
+ public boolean hasNext()
+ {
+ currentIndex = nextIndex++;
+ if (currentIndex % 2 == 0) {
+ return currentIndex < numRowsToIterate;
+ } else {
+ throw new ParseException("Parse exception at ", currentIndex);
+ }
+ }
+
+ @Override
+ public InputRow next()
+ {
+ return ROWS.get(currentIndex / 2);
+ }
+
+ @Override
+ public void close()
+ {
+ }
+ };
+
+ final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
+ parseExceptionThrowingIterator,
+ row -> true,
+ rowIngestionMeters,
+ parseExceptionHandler
+ );
+
+ final List<InputRow> filteredRows = new ArrayList<>();
+ rowIterator.forEachRemaining(filteredRows::add);
+ Assert.assertEquals(ROWS, filteredRows);
+ Assert.assertEquals(ROWS.size(), rowIngestionMeters.getUnparseable());
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testNonParseExceptionInDelegateHasNext()
+ {
+ // This iterator throws ParseException every other call to hasNext().
+ final CloseableIterator<InputRow> parseExceptionThrowingIterator = new CloseableIterator<InputRow>()
+ {
+ final int numRowsToIterate = ROWS.size() * 2;
+ int currentIndex = 0;
+ int nextIndex = 0;
+
+ @Override
+ public boolean hasNext()
+ {
+ currentIndex = nextIndex++;
+ if (currentIndex % 2 == 0) {
+ return currentIndex < numRowsToIterate;
+ } else {
+ throw new RuntimeException("should explode");
+ }
+ }
+
+ @Override
+ public InputRow next()
+ {
+ return ROWS.get(currentIndex / 2);
+ }
+
+ @Override
+ public void close()
+ {
+ }
+ };
+
+ final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
+ parseExceptionThrowingIterator,
+ row -> true,
+ rowIngestionMeters,
+ parseExceptionHandler
+ );
+
+ while (rowIterator.hasNext()) {
+ rowIterator.next();
+ }
+ Assert.fail("you never should have come here");
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org