You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/03/25 14:07:07 UTC

[flink] 02/11: [FLINK-16316][operators] Remove chaining strategy methods from the StreamOperator interface

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c5eb0e52938f1246dc9b5d07b2a2931743ece035
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed Mar 4 16:23:50 2020 +0100

    [FLINK-16316][operators] Remove chaining strategy methods from the StreamOperator interface
    
    Those methods do not have any reason to be on the StreamOperator level since we introduced
    StreamOperatorFactory concept, so they should be moved to SetupableStreamOperator
---
 .../org/apache/flink/state/api/output/SnapshotUtilsTest.java | 12 ------------
 .../streaming/api/operators/SetupableStreamOperator.java     |  4 ++++
 .../flink/streaming/api/operators/SimpleOperatorFactory.java |  8 ++++++--
 .../apache/flink/streaming/api/operators/StreamOperator.java |  4 ----
 .../operators/AbstractUdfStreamOperatorLifecycleTest.java    |  2 --
 5 files changed, 10 insertions(+), 20 deletions(-)

diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
index 01bcbbc..de1f146 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -114,17 +113,6 @@ public class SnapshotUtilsTest {
 		}
 
 		@Override
-		public ChainingStrategy getChainingStrategy() {
-			ACTUAL_ORDER_TRACKING.add("getChainingStrategy");
-			return null;
-		}
-
-		@Override
-		public void setChainingStrategy(ChainingStrategy strategy) {
-			ACTUAL_ORDER_TRACKING.add("setChainingStrategy");
-		}
-
-		@Override
 		public MetricGroup getMetricGroup() {
 			ACTUAL_ORDER_TRACKING.add("getMetricGroup");
 			return null;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SetupableStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SetupableStreamOperator.java
index 78fcf30..0fe8348 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SetupableStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SetupableStreamOperator.java
@@ -38,4 +38,8 @@ public interface SetupableStreamOperator<OUT> {
 	 * Initializes the operator. Sets access to the context and the output.
 	 */
 	void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output);
+
+	ChainingStrategy getChainingStrategy();
+
+	void setChainingStrategy(ChainingStrategy strategy);
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java
index d9ee68b..1f43c2e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java
@@ -61,7 +61,9 @@ public class SimpleOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 
 	protected SimpleOperatorFactory(StreamOperator<OUT> operator) {
 		this.operator = checkNotNull(operator);
-		this.chainingStrategy = operator.getChainingStrategy();
+		if (operator instanceof SetupableStreamOperator) {
+			this.chainingStrategy = ((SetupableStreamOperator) operator).getChainingStrategy();
+		}
 	}
 
 	public StreamOperator<OUT> getOperator() {
@@ -84,7 +86,9 @@ public class SimpleOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 	@Override
 	public void setChainingStrategy(ChainingStrategy strategy) {
 		this.chainingStrategy = strategy;
-		operator.setChainingStrategy(strategy);
+		if (operator instanceof SetupableStreamOperator) {
+			((SetupableStreamOperator) operator).setChainingStrategy(strategy);
+		}
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index fded061..f5be378 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -137,10 +137,6 @@ public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Dis
 
 	void setKeyContextElement2(StreamRecord<?> record) throws Exception;
 
-	ChainingStrategy getChainingStrategy();
-
-	void setChainingStrategy(ChainingStrategy strategy);
-
 	MetricGroup getMetricGroup();
 
 	OperatorID getOperatorID();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index 1c2240e..ea91bd0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -89,7 +89,6 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 	private static final String ALL_METHODS_STREAM_OPERATOR = "[" +
 			"close[], " +
 			"dispose[], " +
-			"getChainingStrategy[], " +
 			"getCurrentKey[], " +
 			"getMetricGroup[], " +
 			"getOperatorID[], " +
@@ -97,7 +96,6 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 			"notifyCheckpointComplete[long], " +
 			"open[], " +
 			"prepareSnapshotPreBarrier[long], " +
-			"setChainingStrategy[class org.apache.flink.streaming.api.operators.ChainingStrategy], " +
 			"setCurrentKey[class java.lang.Object], " +
 			"setKeyContextElement1[class org.apache.flink.streaming.runtime.streamrecord.StreamRecord], " +
 			"setKeyContextElement2[class org.apache.flink.streaming.runtime.streamrecord.StreamRecord], " +