You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/16 08:05:03 UTC

[flink] 02/04: [FLINK-19225][connectors] Various small improvements to SourceReaderBase (part 2)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4700bb5dde3303cbe98882f6beb7379425717b01
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Sep 15 21:27:00 2020 +0200

    [FLINK-19225][connectors] Various small improvements to SourceReaderBase (part 2)
    
      - SourceReaderBase avoids not emitting an element (and exiting to caller / mailbox) when
        transitioning between fetches
    
      - Avoid eager checking of queue empty condition (requires lock acquisition) when determining
        whether end of input is reached. Check that expensive condition last instead.
---
 .../flink/connector/base/source/reader/SourceReaderBase.java   | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index fb4e6df9..97a6a95 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -136,7 +136,10 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 				return trace(InputStatus.MORE_AVAILABLE);
 			}
 			else if (!moveToNextSplit(recordsWithSplitId, output)) {
-				return trace(finishedOrAvailableLater());
+				// The fetch is done and we just discovered that and have not emitted anything, yet.
+				// We need to move to the next fetch. As a shortcut, we call pollNext() here again,
+				// rather than emitting nothing and waiting for the caller to call us again.
+				return pollNext(output);
 			}
 			// else fall through the loop
 		}
@@ -258,9 +261,8 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	// ------------------ private helper methods ---------------------
 
 	private InputStatus finishedOrAvailableLater() {
-		boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers();
-		boolean allElementsEmitted = elementsQueue.isEmpty();
-		if (noMoreSplitsAssignment && allFetchersHaveShutdown && allElementsEmitted) {
+		final boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers();
+		if (noMoreSplitsAssignment && allFetchersHaveShutdown && elementsQueue.isEmpty()) {
 			return InputStatus.END_OF_INPUT;
 		} else {
 			return InputStatus.NOTHING_AVAILABLE;