You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 15:25:17 UTC

[flink] 13/14: [FLINK-17696][streaming runtime] Add CoordinatorEventDispatcher to StreamOperatorParameters

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1c9ed3d846435ef36532e9f193327b30c17b5e11
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 10 22:43:00 2020 +0200

    [FLINK-17696][streaming runtime] Add CoordinatorEventDispatcher to StreamOperatorParameters
    
    This supports more eager initialization of operators that depends on the CoordinatorEventDispatcher.
---
 .../coordination/OperatorEventDispatcher.java          | 16 +++++++++++++---
 .../api/operators/CoordinatedOperatorFactory.java      | 12 ------------
 .../api/operators/StreamOperatorFactoryUtil.java       | 11 ++---------
 .../api/operators/StreamOperatorParameters.java        | 10 +++++++++-
 .../operators/collect/CollectSinkOperatorFactory.java  | 18 ++++++------------
 .../runtime/tasks/OperatorEventDispatcherImpl.java     | 14 +++++++-------
 6 files changed, 37 insertions(+), 44 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventDispatcher.java
index db9ff94..b9d2648 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventDispatcher.java
@@ -21,10 +21,20 @@ package org.apache.flink.runtime.operators.coordination;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 
 /**
- * The dispatcher through which Operators receive operator events and through which they can send operator
- * events back to the coordinator.
+ * The dispatcher through which Operators receive {@link OperatorEvent}s and through which they can
+ * send OperatorEvents back to the {@code OperatorCoordinator}.
  */
 public interface OperatorEventDispatcher {
 
-	OperatorEventGateway registerEventHandler(OperatorID operator, OperatorEventHandler handler);
+	/**
+	 * Register a listener that is notified every time an OperatorEvent is sent from the
+	 * OperatorCoordinator (of the operator with the given OperatorID) to this subtask.
+	 */
+	void registerEventHandler(OperatorID operator, OperatorEventHandler handler);
+
+	/**
+	 * Gets the gateway through which events can be passed to the OperatorCoordinator for
+	 * the operator identified by the given OperatorID.
+	 */
+	OperatorEventGateway getOperatorEventGateway(OperatorID operatorId);
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CoordinatedOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CoordinatedOperatorFactory.java
index dab5bf3..21bda71 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CoordinatedOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CoordinatedOperatorFactory.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
-import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
 
 /**
  * A factory class for the {@link StreamOperator}s implementing
@@ -43,15 +42,4 @@ public interface CoordinatedOperatorFactory<OUT> extends StreamOperatorFactory<O
 	 * @return the provider of the {@link OperatorCoordinator} for this operator.
 	 */
 	OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, OperatorID operatorID);
-
-	/**
-	 * Sets the {@link OperatorEventDispatcher} for registering the
-	 * {@link org.apache.flink.runtime.operators.coordination.OperatorEventHandler OperaterEventHandler} and setup the
-	 * {@link org.apache.flink.runtime.operators.coordination.OperatorEventGateway OperatorEventGateway} for the
-	 * SourceOperator to send events to the operator coordinator. This method will be invoked before
-	 * {@link #createStreamOperator(StreamOperatorParameters)} is invoked.
-	 *
-	 * @param operatorEventDispatcher the {@link OperatorEventDispatcher} to register the
-	 */
-	void setOperatorEventDispatcher(OperatorEventDispatcher operatorEventDispatcher);
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java
index 91b09ac..fb0917f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java
@@ -24,7 +24,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.util.Preconditions;
 
 import java.util.Optional;
 
@@ -61,20 +60,14 @@ public class StreamOperatorFactoryUtil {
 			((ProcessingTimeServiceAware) operatorFactory).setProcessingTimeService(processingTimeService);
 		}
 
-		if (operatorFactory instanceof CoordinatedOperatorFactory) {
-			Preconditions.checkNotNull(
-					operatorEventDispatcher,
-					"The OperatorEventDispatcher should not be null.");
-			((CoordinatedOperatorFactory<OUT>) operatorFactory).setOperatorEventDispatcher(operatorEventDispatcher);
-		}
-
 		// TODO: what to do with ProcessingTimeServiceAware?
 		OP op = operatorFactory.createStreamOperator(
 			new StreamOperatorParameters<>(
 				containingTask,
 				configuration,
 				output,
-				processingTimeService));
+				processingTimeService,
+				operatorEventDispatcher));
 		return new Tuple2<>(op, Optional.ofNullable(processingTimeService));
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java
index 5aaffcc..70df0e4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -37,16 +38,19 @@ public class StreamOperatorParameters<OUT> {
 	private final StreamConfig config;
 	private final Output<StreamRecord<OUT>> output;
 	private final ProcessingTimeService processingTimeService;
+	private final OperatorEventDispatcher operatorEventDispatcher;
 
 	public StreamOperatorParameters(
 			StreamTask<?, ?> containingTask,
 			StreamConfig config,
 			Output<StreamRecord<OUT>> output,
-			ProcessingTimeService processingTimeService) {
+			ProcessingTimeService processingTimeService,
+			OperatorEventDispatcher operatorEventDispatcher) {
 		this.containingTask = containingTask;
 		this.config = config;
 		this.output = output;
 		this.processingTimeService = processingTimeService;
+		this.operatorEventDispatcher = operatorEventDispatcher;
 	}
 
 	public StreamTask<?, ?> getContainingTask() {
@@ -64,4 +68,8 @@ public class StreamOperatorParameters<OUT> {
 	public ProcessingTimeService getProcessingTimeService() {
 		return processingTimeService;
 	}
+
+	public OperatorEventDispatcher getOperatorEventDispatcher() {
+		return operatorEventDispatcher;
+	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java
index 7e9da6e..8fa9843 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.operators.collect;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
-import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
 import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -36,8 +35,6 @@ public class CollectSinkOperatorFactory extends SimpleUdfStreamOperatorFactory<O
 
 	private final CollectSinkOperator<?> operator;
 
-	private OperatorEventDispatcher operatorEventDispatcher;
-
 	public CollectSinkOperatorFactory(CollectSinkOperator<?> operator) {
 		super(operator);
 		this.operator = operator;
@@ -45,11 +42,13 @@ public class CollectSinkOperatorFactory extends SimpleUdfStreamOperatorFactory<O
 
 	@Override
 	public <T extends StreamOperator<Object>> T  createStreamOperator(StreamOperatorParameters<Object> parameters) {
-		OperatorEventGateway operatorEventGateway = operatorEventDispatcher.registerEventHandler(
-			parameters.getStreamConfig().getOperatorID(),
-			operator);
-		operator.setOperatorEventGateway(operatorEventGateway);
+		final OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
+		final OperatorEventDispatcher eventDispatcher = parameters.getOperatorEventDispatcher();
+
+		operator.setOperatorEventGateway(eventDispatcher.getOperatorEventGateway(operatorId));
 		operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
+		eventDispatcher.registerEventHandler(operatorId, operator);
+
 		return (T) operator;
 	}
 
@@ -58,9 +57,4 @@ public class CollectSinkOperatorFactory extends SimpleUdfStreamOperatorFactory<O
 		operator.getOperatorIdFuture().complete(operatorID);
 		return new CollectSinkOperatorCoordinator.Provider(operatorID);
 	}
-
-	@Override
-	public void setOperatorEventDispatcher(OperatorEventDispatcher operatorEventDispatcher) {
-		this.operatorEventDispatcher = operatorEventDispatcher;
-	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java
index 3407d5b..e6b6ea1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java
@@ -74,18 +74,18 @@ final class OperatorEventDispatcherImpl implements OperatorEventDispatcher {
 	}
 
 	@Override
-	public OperatorEventGateway registerEventHandler(OperatorID operator, OperatorEventHandler handler) {
-		final OperatorEventGateway gateway = new OperatorEventGatewayImpl(toCoordinator, operator);
+	public void registerEventHandler(OperatorID operator, OperatorEventHandler handler) {
 		final OperatorEventHandler prior = handlers.putIfAbsent(operator, handler);
-
-		if (prior == null) {
-			return gateway;
-		}
-		else {
+		if (prior != null) {
 			throw new IllegalStateException("already a handler registered for this operatorId");
 		}
 	}
 
+	@Override
+	public OperatorEventGateway getOperatorEventGateway(OperatorID operatorId) {
+		return new OperatorEventGatewayImpl(toCoordinator, operatorId);
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static final class OperatorEventGatewayImpl implements OperatorEventGateway {