You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 15:25:15 UTC
[flink] 11/14: [hotfix][DataStream API] Minor formatting/warnings
fixes in the SourceOperator and StreamOperator code.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1e4880cdd27c26d2b925a3d108178844f7fa95ca
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 10 21:12:41 2020 +0200
[hotfix][DataStream API] Minor formatting/warnings fixes in the SourceOperator and StreamOperator code.
---
.../flink/streaming/api/operators/AbstractStreamOperator.java | 6 ++----
.../org/apache/flink/streaming/api/operators/SourceOperator.java | 6 +++++-
.../flink/streaming/api/operators/SourceOperatorFactory.java | 5 +++++
.../apache/flink/streaming/api/operators/SourceOperatorTest.java | 8 ++++++--
4 files changed, 18 insertions(+), 7 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 61925f4..8a4d215 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -156,7 +156,7 @@ public abstract class AbstractStreamOperator<OUT>
this.config = config;
try {
OperatorMetricGroup operatorMetricGroup = environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), config.getOperatorName());
- this.output = new CountingOutput(output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
+ this.output = new CountingOutput<>(output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
if (config.isChainStart()) {
operatorMetricGroup.getIOMetricGroup().reuseInputMetricsForTask();
}
@@ -394,7 +394,6 @@ public abstract class AbstractStreamOperator<OUT>
return runtimeContext;
}
- @SuppressWarnings("unchecked")
@VisibleForTesting
public <K> KeyedStateBackend<K> getKeyedStateBackend() {
return stateHandler.getKeyedStateBackend();
@@ -462,12 +461,10 @@ public abstract class AbstractStreamOperator<OUT>
}
}
- @SuppressWarnings({"unchecked", "rawtypes"})
public void setCurrentKey(Object key) {
stateHandler.setCurrentKey(key);
}
- @SuppressWarnings({"unchecked", "rawtypes"})
public Object getCurrentKey() {
return stateHandler.getCurrentKey();
}
@@ -551,6 +548,7 @@ public abstract class AbstractStreamOperator<OUT>
if (timeServiceManager == null) {
throw new RuntimeException("The timer service has not been initialized.");
}
+ @SuppressWarnings("unchecked")
InternalTimeServiceManager<K> keyedTimeServiceHandler = (InternalTimeServiceManager<K>) timeServiceManager;
return keyedTimeServiceHandler.getInternalTimerService(
name,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 4b17b0d..3af714d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -51,14 +51,18 @@ import java.util.concurrent.CompletableFuture;
* the interface of {@link PushingAsyncDataInput} for naturally compatible with one input processing in runtime
* stack.
*
- * <p>Note: We are expecting this to be changed to the concrete class once SourceReader interface is introduced.
+ * <p><b>Important Note on Serialization:</b> The SourceOperator inherits the {@link java.io.Serializable}
+ * interface from the StreamOperator, but is in fact NOT serializable. The operator must only be instantiates
+ * in the StreamTask from its factory.
*
* @param <OUT> The output type of the operator.
*/
@Internal
+@SuppressWarnings("serial")
public class SourceOperator<OUT, SplitT extends SourceSplit>
extends AbstractStreamOperator<OUT>
implements OperatorEventHandler, PushingAsyncDataInput<OUT> {
+
// Package private for unit test.
static final ListStateDescriptor<byte[]> SPLITS_STATE_DESC =
new ListStateDescriptor<>("SourceReaderState", BytePrimitiveArraySerializer.INSTANCE);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index 4632b75..79f6453 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -30,10 +30,15 @@ import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
*/
public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OUT>
implements CoordinatedOperatorFactory<OUT> {
+
+ private static final long serialVersionUID = 1L;
+
/** The {@link Source} to create the {@link SourceOperator}. */
private final Source<OUT, ?, ?> source;
+
/** The number of worker thread for the source coordinator. */
private final int numCoordinatorWorkerThread;
+
/** The {@link OperatorEventDispatcher} to register the SourceOperator. */
private OperatorEventDispatcher operatorEventDispatcher;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 453b083..653af8b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -58,10 +58,13 @@ import static org.junit.Assert.assertTrue;
/**
* Unit test for {@link SourceOperator}.
*/
+@SuppressWarnings("serial")
public class SourceOperatorTest {
+
private static final int NUM_SPLITS = 5;
private static final int SUBTASK_INDEX = 1;
private static final MockSourceSplit MOCK_SPLIT = new MockSourceSplit(1234, 10);
+
private MockSource source;
private MockOperatorEventGateway mockGateway;
private SourceOperator<Integer, MockSourceSplit> operator;
@@ -176,8 +179,9 @@ public class SourceOperatorTest {
/**
* A testing class that overrides the getRuntimeContext() Method.
*/
- private static class TestingSourceOperator<OUT, SplitT extends SourceSplit> extends
- SourceOperator<OUT, SplitT> {
+ private static class TestingSourceOperator<OUT, SplitT extends SourceSplit>
+ extends SourceOperator<OUT, SplitT> {
+
private final int subtaskIndex;
TestingSourceOperator(Source<OUT, SplitT, ?> source, int subtaskIndex) {