You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 15:25:15 UTC

[flink] 11/14: [hotfix][DataStream API] Minor formatting/warnings fixes in the SourceOperator and StreamOperator code.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1e4880cdd27c26d2b925a3d108178844f7fa95ca
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 10 21:12:41 2020 +0200

    [hotfix][DataStream API] Minor formatting/warnings fixes in the SourceOperator and StreamOperator code.
---
 .../flink/streaming/api/operators/AbstractStreamOperator.java     | 6 ++----
 .../org/apache/flink/streaming/api/operators/SourceOperator.java  | 6 +++++-
 .../flink/streaming/api/operators/SourceOperatorFactory.java      | 5 +++++
 .../apache/flink/streaming/api/operators/SourceOperatorTest.java  | 8 ++++++--
 4 files changed, 18 insertions(+), 7 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 61925f4..8a4d215 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -156,7 +156,7 @@ public abstract class AbstractStreamOperator<OUT>
 		this.config = config;
 		try {
 			OperatorMetricGroup operatorMetricGroup = environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), config.getOperatorName());
-			this.output = new CountingOutput(output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
+			this.output = new CountingOutput<>(output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
 			if (config.isChainStart()) {
 				operatorMetricGroup.getIOMetricGroup().reuseInputMetricsForTask();
 			}
@@ -394,7 +394,6 @@ public abstract class AbstractStreamOperator<OUT>
 		return runtimeContext;
 	}
 
-	@SuppressWarnings("unchecked")
 	@VisibleForTesting
 	public <K> KeyedStateBackend<K> getKeyedStateBackend() {
 		return stateHandler.getKeyedStateBackend();
@@ -462,12 +461,10 @@ public abstract class AbstractStreamOperator<OUT>
 		}
 	}
 
-	@SuppressWarnings({"unchecked", "rawtypes"})
 	public void setCurrentKey(Object key) {
 		stateHandler.setCurrentKey(key);
 	}
 
-	@SuppressWarnings({"unchecked", "rawtypes"})
 	public Object getCurrentKey() {
 		return stateHandler.getCurrentKey();
 	}
@@ -551,6 +548,7 @@ public abstract class AbstractStreamOperator<OUT>
 		if (timeServiceManager == null) {
 			throw new RuntimeException("The timer service has not been initialized.");
 		}
+		@SuppressWarnings("unchecked")
 		InternalTimeServiceManager<K> keyedTimeServiceHandler = (InternalTimeServiceManager<K>) timeServiceManager;
 		return keyedTimeServiceHandler.getInternalTimerService(
 			name,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 4b17b0d..3af714d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -51,14 +51,18 @@ import java.util.concurrent.CompletableFuture;
  * the interface of {@link PushingAsyncDataInput} for naturally compatible with one input processing in runtime
  * stack.
  *
- * <p>Note: We are expecting this to be changed to the concrete class once SourceReader interface is introduced.
+ * <p><b>Important Note on Serialization:</b> The SourceOperator inherits the {@link java.io.Serializable}
+ * interface from the StreamOperator, but is in fact NOT serializable. The operator must only be instantiates
+ * in the StreamTask from its factory.
  *
  * @param <OUT> The output type of the operator.
  */
 @Internal
+@SuppressWarnings("serial")
 public class SourceOperator<OUT, SplitT extends SourceSplit>
 		extends AbstractStreamOperator<OUT>
 		implements OperatorEventHandler, PushingAsyncDataInput<OUT> {
+
 	// Package private for unit test.
 	static final ListStateDescriptor<byte[]> SPLITS_STATE_DESC =
 			new ListStateDescriptor<>("SourceReaderState", BytePrimitiveArraySerializer.INSTANCE);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index 4632b75..79f6453 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -30,10 +30,15 @@ import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
  */
 public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OUT>
 		implements CoordinatedOperatorFactory<OUT> {
+
+	private static final long serialVersionUID = 1L;
+
 	/** The {@link Source} to create the {@link SourceOperator}. */
 	private final Source<OUT, ?, ?> source;
+
 	/** The number of worker thread for the source coordinator. */
 	private final int numCoordinatorWorkerThread;
+
 	/** The {@link OperatorEventDispatcher} to register the SourceOperator. */
 	private OperatorEventDispatcher operatorEventDispatcher;
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 453b083..653af8b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -58,10 +58,13 @@ import static org.junit.Assert.assertTrue;
 /**
  * Unit test for {@link SourceOperator}.
  */
+@SuppressWarnings("serial")
 public class SourceOperatorTest {
+
 	private static final int NUM_SPLITS = 5;
 	private static final int SUBTASK_INDEX = 1;
 	private static final MockSourceSplit MOCK_SPLIT = new MockSourceSplit(1234, 10);
+
 	private MockSource source;
 	private MockOperatorEventGateway mockGateway;
 	private SourceOperator<Integer, MockSourceSplit> operator;
@@ -176,8 +179,9 @@ public class SourceOperatorTest {
 	/**
 	 * A testing class that overrides the getRuntimeContext() Method.
 	 */
-	private static class TestingSourceOperator<OUT, SplitT extends SourceSplit> extends
-																				SourceOperator<OUT, SplitT> {
+	private static class TestingSourceOperator<OUT, SplitT extends SourceSplit>
+			extends SourceOperator<OUT, SplitT> {
+
 		private final int subtaskIndex;
 
 		TestingSourceOperator(Source<OUT, SplitT, ?> source, int subtaskIndex) {