You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/23 15:25:38 UTC
[flink] 02/02: [FLINK-17854][core] Let SourceReader use InputStatus
directly
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d0f9892c58fdb819dc66737b49d7a1f04dde2036
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 20 21:52:09 2020 +0200
[FLINK-17854][core] Let SourceReader use InputStatus directly
---
.../base/source/reader/SourceReaderBase.java | 13 +++++++------
.../base/source/reader/SourceReaderTestBase.java | 3 ++-
.../flink/api/connector/source/SourceReader.java | 19 ++++---------------
.../api/connector/source/mocks/MockSourceReader.java | 7 ++++---
.../flink/streaming/api/operators/SourceOperator.java | 11 +----------
5 files changed, 18 insertions(+), 35 deletions(-)
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 96d813f..b5781de 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -29,6 +29,7 @@ import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
+import org.apache.flink.core.io.InputStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,7 +111,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
}
@Override
- public Status pollNext(SourceOutput<T> sourceOutput) throws Exception {
+ public InputStatus pollNext(SourceOutput<T> sourceOutput) throws Exception {
splitFetcherManager.checkErrors();
// poll from the queue if the last element was successfully handled. Otherwise
// just pass the last element again.
@@ -120,7 +121,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
recordsWithSplitId = elementsQueue.poll();
}
- Status status;
+ InputStatus status;
if (newFetch && recordsWithSplitId == null) {
// No element available, set to available later if needed.
status = finishedOrAvailableLater();
@@ -146,7 +147,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
status = finishedOrAvailableLater();
} else {
// There are more records from the current splitIter.
- status = Status.AVAILABLE_NOW;
+ status = InputStatus.MORE_AVAILABLE;
}
}
LOG.trace("Source reader status: {}", status);
@@ -223,13 +224,13 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
// ------------------ private helper methods ---------------------
- private Status finishedOrAvailableLater() {
+ private InputStatus finishedOrAvailableLater() {
boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers();
boolean allElementsEmitted = elementsQueue.isEmpty() && (splitIter == null || !splitIter.hasNext());
if (noMoreSplitsAssignment && allFetchersHaveShutdown && allElementsEmitted) {
- return Status.FINISHED;
+ return InputStatus.END_OF_INPUT;
} else {
- return Status.AVAILABLE_LATER;
+ return InputStatus.NOTHING_AVAILABLE;
}
}
}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
index 9f74a45..fcf44fa 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
import org.apache.flink.util.TestLogger;
import org.junit.After;
@@ -106,7 +107,7 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T
// Consumer all the records in the s;oit.
try (SourceReader<Integer, SplitT> reader = consumeRecords(splits, output, NUM_RECORDS_PER_SPLIT)) {
// Now let the main thread poll again.
- assertEquals("The status should be ", SourceReader.Status.AVAILABLE_LATER, reader.pollNext(output));
+ assertEquals("The status should be ", InputStatus.NOTHING_AVAILABLE, reader.pollNext(output));
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
index dedc45e..66a1229 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.connector.source;
import org.apache.flink.annotation.Public;
+import org.apache.flink.core.io.InputStatus;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -45,12 +46,12 @@ public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseab
*
* <p>Although the implementation can emit multiple records into the given SourceOutput,
* it is recommended not doing so. Instead, emit one record into the SourceOutput
- * and return a {@link Status#AVAILABLE_NOW} to let the caller thread
+ * and return a {@link InputStatus#MORE_AVAILABLE} to let the caller thread
* know there are more records available.
*
- * @return The {@link Status} of the SourceReader after the method invocation.
+ * @return The InputStatus of the SourceReader after the method invocation.
*/
- Status pollNext(SourceOutput<T> sourceOutput) throws Exception;
+ InputStatus pollNext(SourceOutput<T> sourceOutput) throws Exception;
/**
* Checkpoint on the state of the source.
@@ -77,16 +78,4 @@ public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseab
* @param sourceEvent the event sent by the {@link SplitEnumerator}.
*/
void handleSourceEvents(SourceEvent sourceEvent);
-
- /**
- * The status of this reader.
- */
- enum Status {
- /** The next record is available right now. */
- AVAILABLE_NOW,
- /** The next record will be available later. */
- AVAILABLE_LATER,
- /** The source reader has completed all the reading work. */
- FINISHED
- }
}
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
index ba60787..9a7a327 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.connector.source.mocks;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.core.io.InputStatus;
import java.util.ArrayList;
import java.util.List;
@@ -52,7 +53,7 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit>
}
@Override
- public Status pollNext(SourceOutput<Integer> sourceOutput) throws Exception {
+ public InputStatus pollNext(SourceOutput<Integer> sourceOutput) throws Exception {
boolean finished = true;
currentSplitIndex = 0;
// Find first splits with available records.
@@ -64,10 +65,10 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit>
// Read from the split with available record.
if (currentSplitIndex < assignedSplits.size()) {
sourceOutput.collect(assignedSplits.get(currentSplitIndex).getNext(false)[0]);
- return Status.AVAILABLE_NOW;
+ return InputStatus.MORE_AVAILABLE;
} else {
// In case no split has available record, return depending on whether all the splits has finished.
- return finished ? Status.FINISHED : Status.AVAILABLE_LATER;
+ return finished ? InputStatus.END_OF_INPUT : InputStatus.NOTHING_AVAILABLE;
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 71e7d70..5d4b2d6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -130,16 +130,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
@Override
@SuppressWarnings("unchecked")
public InputStatus emitNext(DataOutput<OUT> output) throws Exception {
- switch (sourceReader.pollNext((SourceOutput<OUT>) output)) {
- case AVAILABLE_NOW:
- return InputStatus.MORE_AVAILABLE;
- case AVAILABLE_LATER:
- return InputStatus.NOTHING_AVAILABLE;
- case FINISHED:
- return InputStatus.END_OF_INPUT;
- default:
- throw new IllegalStateException("Should never reach here");
- }
+ return sourceReader.pollNext((SourceOutput<OUT>) output);
}
@Override