You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Kezhu Wang (Jira)" <ji...@apache.org> on 2020/10/19 14:49:00 UTC
[jira] [Created] (FLINK-19717) SourceReaderBase.pollNext may return
END_OF_INPUT if SplitReader.fetch throws
Kezhu Wang created FLINK-19717:
----------------------------------
Summary: SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
Key: FLINK-19717
URL: https://issues.apache.org/jira/browse/FLINK-19717
Project: Flink
Issue Type: Bug
Components: Connectors / Common
Affects Versions: 1.12.0
Reporter: Kezhu Wang
Here are my imaginative execution flows:
1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After executes {{splitFetcherManager.checkErrors()}} but before {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from {{SplitFetcherManager}}.
3. In mailbox thread, {{elementsQueue.poll()}} executes. If there is no elements in queue, {{elementsQueue}} will be reset to unavailable.
4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional fetcher is last alive fetcher, then {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to {{InputStatus.END_OF_INPUT}}
5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.
Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which will fails in rate about 1/2.
{code:java}
@Test
public void testExceptionInSplitReader() throws Exception {
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("One or more fetchers have encountered exception");
final String errMsg = "Testing Exception";
FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
new FutureCompletingBlockingQueue<>();
// We have to handle split changes first, otherwise fetch will not be called.
try (MockSourceReader reader = new MockSourceReader(
elementsQueue,
() -> new SplitReader<int[], MockSourceSplit>() {
@Override
public RecordsWithSplitIds<int[]> fetch() {
throw new RuntimeException(errMsg);
}
@Override
public void handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChanges) {}
@Override
public void wakeUp() {
}
},
getConfig(),
null)) {
ValidatingSourceOutput output = new ValidatingSourceOutput();
reader.addSplits(Collections.singletonList(getSplit(0,
NUM_RECORDS_PER_SPLIT,
Boundedness.CONTINUOUS_UNBOUNDED)));
reader.handleSourceEvents(new NoMoreSplitsEvent());
// This is not a real infinite loop, it is supposed to throw exception after some polls.
while (true) {
InputStatus inputStatus = reader.pollNext(output);
assertNotEquals(InputStatus.END_OF_INPUT, inputStatus);
// Add a sleep to avoid tight loop.
Thread.sleep(0);
}
}
}
{code}
This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from existing one in three places:
1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets {{SourceReaderBase.noMoreSplitsAssignment}} to true.
2. Add assertion to assert that {{reader.pollNext}} will not return {{InputStatus.END_OF_INPUT}}.
3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure rate from 1/200 to 1/2.
See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for initial discussion.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)