You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/23 15:25:38 UTC

[flink] 02/02: [FLINK-17854][core] Let SourceReader use InputStatus directly

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d0f9892c58fdb819dc66737b49d7a1f04dde2036
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 20 21:52:09 2020 +0200

    [FLINK-17854][core] Let SourceReader use InputStatus directly
---
 .../base/source/reader/SourceReaderBase.java          | 13 +++++++------
 .../base/source/reader/SourceReaderTestBase.java      |  3 ++-
 .../flink/api/connector/source/SourceReader.java      | 19 ++++---------------
 .../api/connector/source/mocks/MockSourceReader.java  |  7 ++++---
 .../flink/streaming/api/operators/SourceOperator.java | 11 +----------
 5 files changed, 18 insertions(+), 35 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 96d813f..b5781de 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -29,6 +29,7 @@ import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
+import org.apache.flink.core.io.InputStatus;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -110,7 +111,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	}
 
 	@Override
-	public Status pollNext(SourceOutput<T> sourceOutput) throws Exception {
+	public InputStatus pollNext(SourceOutput<T> sourceOutput) throws Exception {
 		splitFetcherManager.checkErrors();
 		// poll from the queue if the last element was successfully handled. Otherwise
 		// just pass the last element again.
@@ -120,7 +121,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 			recordsWithSplitId = elementsQueue.poll();
 		}
 
-		Status status;
+		InputStatus status;
 		if (newFetch && recordsWithSplitId == null) {
 			// No element available, set to available later if needed.
 			status = finishedOrAvailableLater();
@@ -146,7 +147,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 				status = finishedOrAvailableLater();
 			} else {
 				// There are more records from the current splitIter.
-				status = Status.AVAILABLE_NOW;
+				status = InputStatus.MORE_AVAILABLE;
 			}
 		}
 		LOG.trace("Source reader status: {}", status);
@@ -223,13 +224,13 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 
 	// ------------------ private helper methods ---------------------
 
-	private Status finishedOrAvailableLater() {
+	private InputStatus finishedOrAvailableLater() {
 		boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers();
 		boolean allElementsEmitted = elementsQueue.isEmpty() && (splitIter == null || !splitIter.hasNext());
 		if (noMoreSplitsAssignment && allFetchersHaveShutdown && allElementsEmitted) {
-			return Status.FINISHED;
+			return InputStatus.END_OF_INPUT;
 		} else {
-			return Status.AVAILABLE_LATER;
+			return InputStatus.NOTHING_AVAILABLE;
 		}
 	}
 }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
index 9f74a45..fcf44fa 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -106,7 +107,7 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T
 		// Consumer all the records in the s;oit.
 		try (SourceReader<Integer, SplitT> reader = consumeRecords(splits, output, NUM_RECORDS_PER_SPLIT)) {
 			// Now let the main thread poll again.
-			assertEquals("The status should be ", SourceReader.Status.AVAILABLE_LATER, reader.pollNext(output));
+			assertEquals("The status should be ", InputStatus.NOTHING_AVAILABLE, reader.pollNext(output));
 		}
 	}
 
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
index dedc45e..66a1229 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.connector.source;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.core.io.InputStatus;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -45,12 +46,12 @@ public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseab
 	 *
 	 * <p>Although the implementation can emit multiple records into the given SourceOutput,
 	 * it is recommended not doing so. Instead, emit one record into the SourceOutput
-	 * and return a {@link Status#AVAILABLE_NOW} to let the caller thread
+	 * and return a {@link InputStatus#MORE_AVAILABLE} to let the caller thread
 	 * know there are more records available.
 	 *
-	 * @return The {@link Status} of the SourceReader after the method invocation.
+	 * @return The InputStatus of the SourceReader after the method invocation.
 	 */
-	Status pollNext(SourceOutput<T> sourceOutput) throws Exception;
+	InputStatus pollNext(SourceOutput<T> sourceOutput) throws Exception;
 
 	/**
 	 * Checkpoint on the state of the source.
@@ -77,16 +78,4 @@ public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseab
 	 * @param sourceEvent the event sent by the {@link SplitEnumerator}.
 	 */
 	void handleSourceEvents(SourceEvent sourceEvent);
-
-	/**
-	 * The status of this reader.
-	 */
-	enum Status {
-		/** The next record is available right now. */
-		AVAILABLE_NOW,
-		/** The next record will be available later. */
-		AVAILABLE_LATER,
-		/** The source reader has completed all the reading work. */
-		FINISHED
-	}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
index ba60787..9a7a327 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.connector.source.mocks;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.core.io.InputStatus;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -52,7 +53,7 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit>
 	}
 
 	@Override
-	public Status pollNext(SourceOutput<Integer> sourceOutput) throws Exception {
+	public InputStatus pollNext(SourceOutput<Integer> sourceOutput) throws Exception {
 		boolean finished = true;
 		currentSplitIndex = 0;
 		// Find first splits with available records.
@@ -64,10 +65,10 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit>
 		// Read from the split with available record.
 		if (currentSplitIndex < assignedSplits.size()) {
 			sourceOutput.collect(assignedSplits.get(currentSplitIndex).getNext(false)[0]);
-			return Status.AVAILABLE_NOW;
+			return InputStatus.MORE_AVAILABLE;
 		} else {
 			// In case no split has available record, return depending on whether all the splits has finished.
-			return finished ? Status.FINISHED : Status.AVAILABLE_LATER;
+			return finished ? InputStatus.END_OF_INPUT : InputStatus.NOTHING_AVAILABLE;
 		}
 	}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 71e7d70..5d4b2d6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -130,16 +130,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 	@Override
 	@SuppressWarnings("unchecked")
 	public InputStatus emitNext(DataOutput<OUT> output) throws Exception {
-		switch (sourceReader.pollNext((SourceOutput<OUT>) output)) {
-			case AVAILABLE_NOW:
-				return InputStatus.MORE_AVAILABLE;
-			case AVAILABLE_LATER:
-				return InputStatus.NOTHING_AVAILABLE;
-			case FINISHED:
-				return InputStatus.END_OF_INPUT;
-			default:
-				throw new IllegalStateException("Should never reach here");
-		}
+		return sourceReader.pollNext((SourceOutput<OUT>) output);
 	}
 
 	@Override