You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Robert Metzger (Jira)" <ji...@apache.org> on 2020/12/08 06:40:09 UTC

[jira] [Updated] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws

     [ https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Robert Metzger updated FLINK-19717:
-----------------------------------
    Fix Version/s:     (was: 1.12.0)
                   1.13.0

> SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-19717
>                 URL: https://issues.apache.org/jira/browse/FLINK-19717
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 1.12.0
>            Reporter: Kezhu Wang
>            Assignee: Kezhu Wang
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.11.3, 1.13.0
>
>
> Here are my imaginative execution flows:
> 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After executes {{splitFetcherManager.checkErrors()}} but before {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
> 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from {{SplitFetcherManager}}.
> 3. In mailbox thread,  {{elementsQueue.poll()}} executes. If there is no elements in queue, {{elementsQueue}} will be reset to unavailable.
> 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional fetcher is last alive fetcher, then {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to {{InputStatus.END_OF_INPUT}}
> 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.
> Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which will fails in rate about 1/2.
> {code:java}
> 	@Test
> 	public void testExceptionInSplitReader() throws Exception {
> 		expectedException.expect(RuntimeException.class);
> 		expectedException.expectMessage("One or more fetchers have encountered exception");
> 		final String errMsg = "Testing Exception";
> 		FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
> 			new FutureCompletingBlockingQueue<>();
> 		// We have to handle split changes first, otherwise fetch will not be called.
> 		try (MockSourceReader reader = new MockSourceReader(
> 			elementsQueue,
> 			() -> new SplitReader<int[], MockSourceSplit>() {
> 				@Override
> 				public RecordsWithSplitIds<int[]> fetch() {
> 					throw new RuntimeException(errMsg);
> 				}
> 				@Override
> 				public void handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChanges) {}
> 				@Override
> 				public void wakeUp() {
> 				}
> 			},
> 			getConfig(),
> 			null)) {
> 			ValidatingSourceOutput output = new ValidatingSourceOutput();
> 			reader.addSplits(Collections.singletonList(getSplit(0,
> 				NUM_RECORDS_PER_SPLIT,
> 				Boundedness.CONTINUOUS_UNBOUNDED)));
> 			reader.handleSourceEvents(new NoMoreSplitsEvent());
> 			// This is not a real infinite loop, it is supposed to throw exception after some polls.
> 			while (true) {
> 				InputStatus inputStatus = reader.pollNext(output);
> 				assertNotEquals(InputStatus.END_OF_INPUT, inputStatus);
> 				// Add a sleep to avoid tight loop.
> 				Thread.sleep(0);
> 			}
> 		}
> 	}
> {code}
> This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from existing one in three places:
>  1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets {{SourceReaderBase.noMoreSplitsAssignment}} to true.
>  2. Add assertion to assert that {{reader.pollNext}} will not return {{InputStatus.END_OF_INPUT}}.
>  3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure rate from 1/200 to 1/2.
> See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for initial discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)