You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/16 08:05:03 UTC
[flink] 02/04: [FLINK-19225][connectors] Various small improvements
to SourceReaderBase (part 2)
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4700bb5dde3303cbe98882f6beb7379425717b01
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Sep 15 21:27:00 2020 +0200
[FLINK-19225][connectors] Various small improvements to SourceReaderBase (part 2)
- SourceReaderBase avoids not emitting an element (and exiting to caller / mailbox) when
transitioning between fetches
- Avoid eager checking of queue empty condition (requires lock acquisition) when determining
whether end of input is reached. Check that expensive condition last instead.
---
.../flink/connector/base/source/reader/SourceReaderBase.java | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index fb4e6df9..97a6a95 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -136,7 +136,10 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
return trace(InputStatus.MORE_AVAILABLE);
}
else if (!moveToNextSplit(recordsWithSplitId, output)) {
- return trace(finishedOrAvailableLater());
+ // The fetch is done and we just discovered that and have not emitted anything, yet.
+ // We need to move to the next fetch. As a shortcut, we call pollNext() here again,
+ // rather than emitting nothing and waiting for the caller to call us again.
+ return pollNext(output);
}
// else fall through the loop
}
@@ -258,9 +261,8 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
// ------------------ private helper methods ---------------------
private InputStatus finishedOrAvailableLater() {
- boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers();
- boolean allElementsEmitted = elementsQueue.isEmpty();
- if (noMoreSplitsAssignment && allFetchersHaveShutdown && allElementsEmitted) {
+ final boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers();
+ if (noMoreSplitsAssignment && allFetchersHaveShutdown && elementsQueue.isEmpty()) {
return InputStatus.END_OF_INPUT;
} else {
return InputStatus.NOTHING_AVAILABLE;