You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/23 15:25:36 UTC

[flink] branch master updated (6186069 -> d0f9892)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 6186069  [FLINK-17303][python] Return TableResult for Python TableEnvironment
     new d2559d1  [FLINK-17854][core] Move InputStatus to flink-core
     new d0f9892  [FLINK-17854][core] Let SourceReader use InputStatus directly

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../base/source/reader/SourceReaderBase.java       | 13 ++++++-----
 .../base/source/reader/SourceReaderTestBase.java   |  3 ++-
 .../flink/api/connector/source/SourceReader.java   | 19 ++++-----------
 .../org/apache/flink/core}/io/InputStatus.java     | 27 +++++++++++++++-------
 .../connector/source/mocks/MockSourceReader.java   |  7 +++---
 .../streaming/api/operators/SourceOperator.java    | 13 ++---------
 .../runtime/io/MultipleInputSelectionHandler.java  |  1 +
 .../runtime/io/PushingAsyncDataInput.java          |  1 +
 .../streaming/runtime/io/StreamInputProcessor.java |  1 +
 .../runtime/io/StreamMultipleInputProcessor.java   |  1 +
 .../runtime/io/StreamOneInputProcessor.java        |  1 +
 .../runtime/io/StreamTaskNetworkInput.java         |  1 +
 .../runtime/io/StreamTaskSourceInput.java          |  1 +
 .../runtime/io/StreamTwoInputProcessor.java        |  1 +
 .../flink/streaming/runtime/tasks/StreamTask.java  |  2 +-
 .../runtime/io/StreamTaskNetworkInputTest.java     |  1 +
 .../runtime/tasks/MultipleInputStreamTaskTest.java |  2 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |  2 +-
 18 files changed, 50 insertions(+), 47 deletions(-)
 rename {flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime => flink-core/src/main/java/org/apache/flink/core}/io/InputStatus.java (52%)


[flink] 01/02: [FLINK-17854][core] Move InputStatus to flink-core

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d2559d14ac8f814219a3e4c5e81b77c36b610688
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 20 18:49:10 2020 +0200

    [FLINK-17854][core] Move InputStatus to flink-core
    
    That way, the InputStatus can be used by API classes that directly connect with the asynchronous
    input paradigm of the streaming runtime.
---
 .../org/apache/flink/core}/io/InputStatus.java     | 27 +++++++++++++++-------
 .../streaming/api/operators/SourceOperator.java    |  2 +-
 .../runtime/io/MultipleInputSelectionHandler.java  |  1 +
 .../runtime/io/PushingAsyncDataInput.java          |  1 +
 .../streaming/runtime/io/StreamInputProcessor.java |  1 +
 .../runtime/io/StreamMultipleInputProcessor.java   |  1 +
 .../runtime/io/StreamOneInputProcessor.java        |  1 +
 .../runtime/io/StreamTaskNetworkInput.java         |  1 +
 .../runtime/io/StreamTaskSourceInput.java          |  1 +
 .../runtime/io/StreamTwoInputProcessor.java        |  1 +
 .../flink/streaming/runtime/tasks/StreamTask.java  |  2 +-
 .../runtime/io/StreamTaskNetworkInputTest.java     |  1 +
 .../runtime/tasks/MultipleInputStreamTaskTest.java |  2 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |  2 +-
 14 files changed, 32 insertions(+), 12 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputStatus.java b/flink-core/src/main/java/org/apache/flink/core/io/InputStatus.java
similarity index 52%
rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputStatus.java
rename to flink-core/src/main/java/org/apache/flink/core/io/InputStatus.java
index a2d8d29..7f5a279 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputStatus.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/InputStatus.java
@@ -15,22 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.runtime.io;
+package org.apache.flink.core.io;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput;
+import org.apache.flink.annotation.PublicEvolving;
 
 /**
- * An {@link InputStatus} indicates one input state which might be currently
- * available, not available or already finished. It is returned while calling
- * {@link PushingAsyncDataInput#emitNext(DataOutput)}.
+ * An {@code InputStatus} indicates the availability of data from an asynchronous input.
+ * When asking an asynchronous input to produce data, it returns this status to indicate how to
+ * proceed.
+ *
+ * <p>When the input returns {@link InputStatus#NOTHING_AVAILABLE} it means that no data is available
+ * at this time, but more will (most likely) be available in the future. The asynchronous input
+ * will typically offer to register a <i>Notifier</i> or to obtain a <i>Future</i> that will signal
+ * the availability of new data.
+ *
+ * <p>When the input returns {@link InputStatus#MORE_AVAILABLE}, it can be immediately asked
+ * again to produce more data. That readers from the asynchronous input can bypass subscribing to
+ * a Notifier or a Future for efficiency.
+ *
+ * <p>When the input returns {@link InputStatus#END_OF_INPUT}, then no data will be available again
+ * from this input. It has reached the end of its bounded data.
  */
-@Internal
+@PublicEvolving
 public enum InputStatus {
 
 	/**
 	 * Indicator that more data is available and the input can be called immediately again
-	 * to emit more data.
+	 * to produce more data.
 	 */
 	MORE_AVAILABLE,
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index d8907ac..71e7d70 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
@@ -38,7 +39,6 @@ import org.apache.flink.runtime.source.event.SourceEventWrapper;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
-import org.apache.flink.streaming.runtime.io.InputStatus;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
 import org.apache.flink.util.CollectionUtil;
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/MultipleInputSelectionHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/MultipleInputSelectionHandler.java
index f9e3ca7..6bc4545 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/MultipleInputSelectionHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/MultipleInputSelectionHandler.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.streaming.api.operators.InputSelectable;
 import org.apache.flink.streaming.api.operators.InputSelection;
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/PushingAsyncDataInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/PushingAsyncDataInput.java
index 7fcd047..a0193a6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/PushingAsyncDataInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/PushingAsyncDataInput.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.runtime.io.AvailabilityProvider;
 import org.apache.flink.runtime.io.PullingAsyncDataInput;
 import org.apache.flink.streaming.api.watermark.Watermark;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 6990379..9718e8c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.io.AvailabilityProvider;
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
index a16cc95..80f803c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java
index 2ca78ce..d520a6e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput;
 import org.apache.flink.streaming.runtime.tasks.OperatorChain;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
index 6723a2d..7134b44 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
index 68ba39a..0b2d065 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.streaming.api.operators.SourceOperator;
 import org.apache.flink.util.IOUtils;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 071c5d6..85b967e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1c346b1..7c08712 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
@@ -60,7 +61,6 @@ import org.apache.flink.streaming.api.operators.MailboxExecutor;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
-import org.apache.flink.streaming.runtime.io.InputStatus;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
 import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index 6f7bc47..0237aef 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index 1299740..ed3bd77 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
@@ -44,7 +45,6 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.InputStatus;
 import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index adaf406..b5031b3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -102,7 +103,6 @@ import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
-import org.apache.flink.streaming.runtime.io.InputStatus;
 import org.apache.flink.streaming.runtime.io.MockIndexedInputGate;
 import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;


[flink] 02/02: [FLINK-17854][core] Let SourceReader use InputStatus directly

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d0f9892c58fdb819dc66737b49d7a1f04dde2036
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 20 21:52:09 2020 +0200

    [FLINK-17854][core] Let SourceReader use InputStatus directly
---
 .../base/source/reader/SourceReaderBase.java          | 13 +++++++------
 .../base/source/reader/SourceReaderTestBase.java      |  3 ++-
 .../flink/api/connector/source/SourceReader.java      | 19 ++++---------------
 .../api/connector/source/mocks/MockSourceReader.java  |  7 ++++---
 .../flink/streaming/api/operators/SourceOperator.java | 11 +----------
 5 files changed, 18 insertions(+), 35 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 96d813f..b5781de 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -29,6 +29,7 @@ import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
+import org.apache.flink.core.io.InputStatus;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -110,7 +111,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	}
 
 	@Override
-	public Status pollNext(SourceOutput<T> sourceOutput) throws Exception {
+	public InputStatus pollNext(SourceOutput<T> sourceOutput) throws Exception {
 		splitFetcherManager.checkErrors();
 		// poll from the queue if the last element was successfully handled. Otherwise
 		// just pass the last element again.
@@ -120,7 +121,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 			recordsWithSplitId = elementsQueue.poll();
 		}
 
-		Status status;
+		InputStatus status;
 		if (newFetch && recordsWithSplitId == null) {
 			// No element available, set to available later if needed.
 			status = finishedOrAvailableLater();
@@ -146,7 +147,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 				status = finishedOrAvailableLater();
 			} else {
 				// There are more records from the current splitIter.
-				status = Status.AVAILABLE_NOW;
+				status = InputStatus.MORE_AVAILABLE;
 			}
 		}
 		LOG.trace("Source reader status: {}", status);
@@ -223,13 +224,13 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 
 	// ------------------ private helper methods ---------------------
 
-	private Status finishedOrAvailableLater() {
+	private InputStatus finishedOrAvailableLater() {
 		boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers();
 		boolean allElementsEmitted = elementsQueue.isEmpty() && (splitIter == null || !splitIter.hasNext());
 		if (noMoreSplitsAssignment && allFetchersHaveShutdown && allElementsEmitted) {
-			return Status.FINISHED;
+			return InputStatus.END_OF_INPUT;
 		} else {
-			return Status.AVAILABLE_LATER;
+			return InputStatus.NOTHING_AVAILABLE;
 		}
 	}
 }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
index 9f74a45..fcf44fa 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -106,7 +107,7 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T
 		// Consumer all the records in the s;oit.
 		try (SourceReader<Integer, SplitT> reader = consumeRecords(splits, output, NUM_RECORDS_PER_SPLIT)) {
 			// Now let the main thread poll again.
-			assertEquals("The status should be ", SourceReader.Status.AVAILABLE_LATER, reader.pollNext(output));
+			assertEquals("The status should be ", InputStatus.NOTHING_AVAILABLE, reader.pollNext(output));
 		}
 	}
 
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
index dedc45e..66a1229 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.connector.source;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.core.io.InputStatus;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -45,12 +46,12 @@ public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseab
 	 *
 	 * <p>Although the implementation can emit multiple records into the given SourceOutput,
 	 * it is recommended not doing so. Instead, emit one record into the SourceOutput
-	 * and return a {@link Status#AVAILABLE_NOW} to let the caller thread
+	 * and return a {@link InputStatus#MORE_AVAILABLE} to let the caller thread
 	 * know there are more records available.
 	 *
-	 * @return The {@link Status} of the SourceReader after the method invocation.
+	 * @return The InputStatus of the SourceReader after the method invocation.
 	 */
-	Status pollNext(SourceOutput<T> sourceOutput) throws Exception;
+	InputStatus pollNext(SourceOutput<T> sourceOutput) throws Exception;
 
 	/**
 	 * Checkpoint on the state of the source.
@@ -77,16 +78,4 @@ public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseab
 	 * @param sourceEvent the event sent by the {@link SplitEnumerator}.
 	 */
 	void handleSourceEvents(SourceEvent sourceEvent);
-
-	/**
-	 * The status of this reader.
-	 */
-	enum Status {
-		/** The next record is available right now. */
-		AVAILABLE_NOW,
-		/** The next record will be available later. */
-		AVAILABLE_LATER,
-		/** The source reader has completed all the reading work. */
-		FINISHED
-	}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
index ba60787..9a7a327 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.connector.source.mocks;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.core.io.InputStatus;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -52,7 +53,7 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit>
 	}
 
 	@Override
-	public Status pollNext(SourceOutput<Integer> sourceOutput) throws Exception {
+	public InputStatus pollNext(SourceOutput<Integer> sourceOutput) throws Exception {
 		boolean finished = true;
 		currentSplitIndex = 0;
 		// Find first splits with available records.
@@ -64,10 +65,10 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit>
 		// Read from the split with available record.
 		if (currentSplitIndex < assignedSplits.size()) {
 			sourceOutput.collect(assignedSplits.get(currentSplitIndex).getNext(false)[0]);
-			return Status.AVAILABLE_NOW;
+			return InputStatus.MORE_AVAILABLE;
 		} else {
 			// In case no split has available record, return depending on whether all the splits has finished.
-			return finished ? Status.FINISHED : Status.AVAILABLE_LATER;
+			return finished ? InputStatus.END_OF_INPUT : InputStatus.NOTHING_AVAILABLE;
 		}
 	}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 71e7d70..5d4b2d6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -130,16 +130,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 	@Override
 	@SuppressWarnings("unchecked")
 	public InputStatus emitNext(DataOutput<OUT> output) throws Exception {
-		switch (sourceReader.pollNext((SourceOutput<OUT>) output)) {
-			case AVAILABLE_NOW:
-				return InputStatus.MORE_AVAILABLE;
-			case AVAILABLE_LATER:
-				return InputStatus.NOTHING_AVAILABLE;
-			case FINISHED:
-				return InputStatus.END_OF_INPUT;
-			default:
-				throw new IllegalStateException("Should never reach here");
-		}
+		return sourceReader.pollNext((SourceOutput<OUT>) output);
 	}
 
 	@Override