You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/14 08:12:54 UTC

[flink] branch master updated (0dc4e76 -> 41b6bfa)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 0dc4e76  [hotfix][docs] Add note SSL not enabled by default
     new a44cf4d  [hotfix][tests] Add a TestingSchedulerFactory to help instantiating the scheduler in tests
     new 41b6bfa  [FLINK-15099][runtime] Add Operator Coordinators and Events

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/util/AutoContextClassLoader.java  |  82 ++++++
 .../state/api/runtime/SavepointEnvironment.java    |  18 ++
 .../flink/runtime/execution/Environment.java       |   6 +
 .../flink/runtime/executiongraph/Execution.java    |  22 ++
 .../runtime/executiongraph/ExecutionJobVertex.java |  23 ++
 .../ExecutionJobVertexCoordinatorContext.java      |  80 ++++++
 .../runtime/executiongraph/ExecutionVertex.java    |   2 +
 .../apache/flink/runtime/jobgraph/JobVertex.java   |  16 +-
 .../runtime/jobgraph/tasks/AbstractInvokable.java  |   8 +
 .../jobgraph/tasks/TaskOperatorEventGateway.java   |  49 ++++
 .../jobmanager/slots/TaskManagerGateway.java       |  12 +-
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  20 +-
 .../flink/runtime/jobmaster/JobMasterGateway.java  |   3 +-
 .../jobmaster/JobMasterOperatorEventGateway.java   |  52 ++++
 .../runtime/jobmaster/RpcTaskManagerGateway.java   |  11 +
 .../coordination/OperatorCoordinator.java          | 108 ++++++++
 .../coordination/OperatorCoordinatorUtil.java      |  61 +++++
 .../operators/coordination/OperatorEvent.java      |  26 ++
 .../coordination/OperatorEventDispatcher.java      |  30 +++
 .../coordination/OperatorEventGateway.java         |  48 ++++
 .../coordination/OperatorEventHandler.java         |  32 +++
 .../coordination/TaskNotRunningException.java      |  33 +++
 .../flink/runtime/scheduler/SchedulerBase.java     |  69 ++++-
 .../flink/runtime/scheduler/SchedulerNG.java       |  18 ++
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  41 +++
 .../runtime/taskexecutor/TaskExecutorGateway.java  |  11 +-
 .../TaskExecutorOperatorEventGateway.java          |  49 ++++
 .../rpc/RpcTaskOperatorEventGateway.java           |  65 +++++
 .../runtime/taskmanager/RuntimeEnvironment.java    |  19 +-
 .../org/apache/flink/runtime/taskmanager/Task.java |  40 +++
 .../ManuallyTriggeredScheduledExecutorService.java |  98 +++++++
 .../utils/SimpleAckingTaskManagerGateway.java      |  12 +
 .../jobmaster/utils/TestingJobMasterGateway.java   |  15 +-
 .../utils/TestingJobMasterGatewayBuilder.java      |  12 +-
 .../OperatorCoordinatorSchedulerTest.java          | 286 +++++++++++++++++++++
 .../operators/coordination/TestOperatorEvent.java  |  26 ++
 .../coordination/TestingOperatorCoordinator.java   | 132 ++++++++++
 .../operators/testutils/DummyEnvironment.java      |   7 +
 .../operators/testutils/MockEnvironment.java       |   7 +
 .../runtime/scheduler/DefaultSchedulerTest.java    |  56 +---
 .../runtime/scheduler/SchedulerTestingUtils.java   | 216 ++++++++++++++++
 .../scheduler/TestExecutionSlotAllocator.java      |   7 +
 .../TestExecutionSlotAllocatorFactory.java         |  12 +-
 .../TaskExecutorOperatorEventHandlingTest.java     | 215 ++++++++++++++++
 .../taskexecutor/TestingTaskExecutorGateway.java   |  19 +-
 .../TestingTaskExecutorGatewayBuilder.java         |  14 +-
 .../taskmanager/NoOpTaskOperatorEventGateway.java  |  34 +++
 .../runtime/taskmanager/TaskAsyncCallTest.java     |   1 +
 .../flink/runtime/taskmanager/TestTaskBuilder.java |   1 +
 .../runtime/testutils/CancelableInvokable.java     |  50 ++++
 .../runtime/util/JvmExitOnFatalErrorTest.java      |   2 +
 .../flink/runtime/util/SerializableFunction.java   |  31 +++
 .../streaming/runtime/tasks/OperatorChain.java     |  20 ++
 .../runtime/tasks/OperatorEventDispatcherImpl.java | 117 +++++++++
 .../flink/streaming/runtime/tasks/StreamTask.java  |  31 +++
 .../tasks/InterruptSensitiveRestoreTest.java       |   2 +
 .../runtime/tasks/StreamMockEnvironment.java       |   7 +
 .../runtime/tasks/StreamTaskTerminationTest.java   |   2 +
 .../streaming/runtime/tasks/StreamTaskTest.java    |   2 +
 .../runtime/tasks/SynchronousCheckpointITCase.java |   2 +
 .../tasks/TaskCheckpointingBehaviourTest.java      |   2 +
 .../apache/flink/core/testutils/FlinkMatchers.java | 161 ++++++++++++
 62 files changed, 2584 insertions(+), 69 deletions(-)
 create mode 100644 flink-core/src/main/java/org/apache/flink/util/AutoContextClassLoader.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexCoordinatorContext.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/TaskOperatorEventGateway.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterOperatorEventGateway.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorUtil.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEvent.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventDispatcher.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventGateway.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventHandler.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/TaskNotRunningException.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventGateway.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcTaskOperatorEventGateway.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutorService.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskOperatorEventGateway.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CancelableInvokable.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializableFunction.java
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java
 create mode 100644 flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java


[flink] 01/02: [hotfix][tests] Add a TestingSchedulerFactory to help instantiating the scheduler in tests

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a44cf4d912cf85c03077c0b766814e877b41e7b1
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sat Dec 7 14:47:43 2019 +0100

    [hotfix][tests] Add a TestingSchedulerFactory to help instantiating the scheduler in tests
---
 .../ManuallyTriggeredScheduledExecutorService.java |  98 ++++++++++++
 .../runtime/scheduler/DefaultSchedulerTest.java    |  56 +------
 .../runtime/scheduler/SchedulerTestingUtils.java   | 170 +++++++++++++++++++++
 3 files changed, 271 insertions(+), 53 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutorService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutorService.java
new file mode 100644
index 0000000..7524f76
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutorService.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Simple {@link ScheduledExecutorService} implementation for testing purposes.
+ */
+public class ManuallyTriggeredScheduledExecutorService extends ManuallyTriggeredScheduledExecutor implements ScheduledExecutorService {
+
+	private boolean shutdown;
+
+	@Override
+	public void shutdown() {
+		shutdown = true;
+	}
+
+	@Override
+	public List<Runnable> shutdownNow() {
+		shutdown();
+		return Collections.emptyList();
+	}
+
+	@Override
+	public boolean isShutdown() {
+		return false;
+	}
+
+	@Override
+	public boolean isTerminated() {
+		return shutdown;
+	}
+
+	@Override
+	public boolean awaitTermination(long timeout, TimeUnit unit) {
+		return true;
+	}
+
+	@Override
+	public <T> Future<T> submit(Callable<T> task) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public <T> Future<T> submit(Runnable task, T result) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Future<?> submit(Runnable task) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+		throw new UnsupportedOperationException();
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index cce795d..2e9a2fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.hooks.TestMasterHook;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
@@ -48,10 +47,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
 import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
@@ -77,7 +73,6 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -88,6 +83,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint;
+import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.enableCheckpointing;
+import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
 import static org.hamcrest.Matchers.contains;
@@ -688,54 +686,6 @@ public class DefaultSchedulerTest extends TestLogger {
 			new TaskExecutionState(scheduler.getJobGraph().getJobID(), attemptId, ExecutionState.FINISHED));
 	}
 
-	private void acknowledgePendingCheckpoint(final SchedulerBase scheduler, final long checkpointId) throws Exception {
-		final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
-
-		for (ArchivedExecutionVertex executionVertex : scheduler.requestJob().getAllExecutionVertices()) {
-			final ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
-			final AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-				scheduler.getJobGraph().getJobID(),
-				attemptId,
-				checkpointId);
-			checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location");
-		}
-	}
-
-	private void enableCheckpointing(final JobGraph jobGraph) {
-		final List<JobVertexID> triggerVertices = new ArrayList<>();
-		final List<JobVertexID> ackVertices = new ArrayList<>();
-		final List<JobVertexID> commitVertices = new ArrayList<>();
-
-		for (JobVertex vertex : jobGraph.getVertices()) {
-			if (vertex.isInputVertex()) {
-				triggerVertices.add(vertex.getID());
-			}
-			commitVertices.add(vertex.getID());
-			ackVertices.add(vertex.getID());
-		}
-
-		jobGraph.setSnapshotSettings(
-			new JobCheckpointingSettings(
-				triggerVertices,
-				ackVertices,
-				commitVertices,
-				new CheckpointCoordinatorConfiguration(
-					Long.MAX_VALUE, // disable periodical checkpointing
-					10 * 60 * 1000,
-					0,
-					1,
-					CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-					false,
-					false,
-					0),
-				null));
-	}
-
-	private CheckpointCoordinator getCheckpointCoordinator(final SchedulerBase scheduler) {
-		// TODO: get CheckpointCoordinator from the scheduler directly after it is factored out from ExecutionGraph
-		return scheduler.getExecutionGraph().getCheckpointCoordinator();
-	}
-
 	private void waitForTermination(final DefaultScheduler scheduler) throws Exception {
 		scheduler.getTerminationFuture().get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
new file mode 100644
index 0000000..5297fac
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
+import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * A utility class to create {@link DefaultScheduler} instances for testing.
+ */
+public class SchedulerTestingUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(SchedulerTestingUtils.class);
+
+	private static final long DEFAULT_CHECKPOINT_TIMEOUT_MS = 10 * 60 * 1000;
+
+	public static DefaultScheduler createScheduler(
+			JobGraph jobGraph,
+			ManuallyTriggeredScheduledExecutorService asyncExecutor) throws Exception {
+
+		return new DefaultScheduler(
+			LOG,
+			jobGraph,
+			VoidBackPressureStatsTracker.INSTANCE,
+			Executors.directExecutor(),
+			new Configuration(),
+			new SimpleSlotProvider(jobGraph.getJobID(), 0),
+			asyncExecutor,
+			asyncExecutor,
+			ClassLoader.getSystemClassLoader(),
+			new StandaloneCheckpointRecoveryFactory(),
+			Time.seconds(300),
+			VoidBlobWriter.getInstance(),
+			UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(),
+			Time.seconds(300),
+			NettyShuffleMaster.INSTANCE,
+			NoOpJobMasterPartitionTracker.INSTANCE,
+			new EagerSchedulingStrategy.Factory(),
+			new RestartPipelinedRegionFailoverStrategy.Factory(),
+			new TestRestartBackoffTimeStrategy(true, 0),
+			new DefaultExecutionVertexOperations(),
+			new ExecutionVertexVersioner(),
+			new TestExecutionSlotAllocatorFactory());
+	}
+
+	public static void enableCheckpointing(final JobGraph jobGraph) {
+		final List<JobVertexID> triggerVertices = new ArrayList<>();
+		final List<JobVertexID> allVertices = new ArrayList<>();
+
+		for (JobVertex vertex : jobGraph.getVertices()) {
+			if (vertex.isInputVertex()) {
+				triggerVertices.add(vertex.getID());
+			}
+			allVertices.add(vertex.getID());
+		}
+
+		final CheckpointCoordinatorConfiguration config = new CheckpointCoordinatorConfiguration(
+			Long.MAX_VALUE, // disable periodical checkpointing
+			DEFAULT_CHECKPOINT_TIMEOUT_MS,
+			0,
+			1,
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+			false,
+			false,
+			0);
+
+		jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
+				triggerVertices, allVertices, allVertices,
+				config, null));
+	}
+
+	public static Collection<ExecutionAttemptID> getAllCurrentExecutionAttempts(DefaultScheduler scheduler) {
+		return StreamSupport.stream(scheduler.requestJob().getAllExecutionVertices().spliterator(), false)
+			.map((vertex) -> vertex.getCurrentExecutionAttempt().getAttemptId())
+			.collect(Collectors.toList());
+	}
+
+	public static void setAllExecutionsToRunning(final DefaultScheduler scheduler) {
+		final JobID jid = scheduler.requestJob().getJobID();
+		getAllCurrentExecutionAttempts(scheduler).forEach(
+			(attemptId) -> scheduler.updateTaskExecutionState(new TaskExecutionState(jid, attemptId, ExecutionState.RUNNING))
+		);
+	}
+
+	public static void acknowledgePendingCheckpoint(final DefaultScheduler scheduler, final long checkpointId) throws CheckpointException {
+		final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
+		final JobID jid = scheduler.requestJob().getJobID();
+
+		for (ExecutionAttemptID attemptId : getAllCurrentExecutionAttempts(scheduler)) {
+			final AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, attemptId, checkpointId);
+			checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location");
+		}
+	}
+
+	public static CompletedCheckpoint takeCheckpoint(DefaultScheduler scheduler) throws Exception {
+		final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
+		checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), false);
+
+		assertEquals("test setup inconsistent", 1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+		final PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
+		final CompletableFuture<CompletedCheckpoint> future = checkpoint.getCompletionFuture();
+
+		acknowledgePendingCheckpoint(scheduler, checkpoint.getCheckpointId());
+
+		CompletedCheckpoint completed = future.getNow(null);
+		assertNotNull("checkpoint not complete", completed);
+		return completed;
+	}
+
+	@SuppressWarnings("deprecation")
+	public static CheckpointCoordinator getCheckpointCoordinator(SchedulerBase scheduler) {
+		return scheduler.getExecutionGraph().getCheckpointCoordinator();
+	}
+}


[flink] 02/02: [FLINK-15099][runtime] Add Operator Coordinators and Events

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 41b6bfa62c61bfe758ce67b06cbf32b3cc937691
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Nov 15 13:12:14 2019 +0100

    [FLINK-15099][runtime] Add Operator Coordinators and Events
    
    Operator Coordinators are instances that exist once per operator. While the operators run on the TaskManagers, the
    coordinator runs on the JobManager. The coordinator communicates via events with the operators, typicalls to
    assign work.
    
    The first user for those coordinators would be the new source interface.
    Further users we envision are sinks (for coordinated commits of metadata), or iterations (gather progress and
    steer supersteps) as well as simple approximate alignments between streams (event time alignment).
---
 .../apache/flink/util/AutoContextClassLoader.java  |  82 ++++++
 .../state/api/runtime/SavepointEnvironment.java    |  18 ++
 .../flink/runtime/execution/Environment.java       |   6 +
 .../flink/runtime/executiongraph/Execution.java    |  22 ++
 .../runtime/executiongraph/ExecutionJobVertex.java |  23 ++
 .../ExecutionJobVertexCoordinatorContext.java      |  80 ++++++
 .../runtime/executiongraph/ExecutionVertex.java    |   2 +
 .../apache/flink/runtime/jobgraph/JobVertex.java   |  16 +-
 .../runtime/jobgraph/tasks/AbstractInvokable.java  |   8 +
 .../jobgraph/tasks/TaskOperatorEventGateway.java   |  49 ++++
 .../jobmanager/slots/TaskManagerGateway.java       |  12 +-
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  20 +-
 .../flink/runtime/jobmaster/JobMasterGateway.java  |   3 +-
 .../jobmaster/JobMasterOperatorEventGateway.java   |  52 ++++
 .../runtime/jobmaster/RpcTaskManagerGateway.java   |  11 +
 .../coordination/OperatorCoordinator.java          | 108 ++++++++
 .../coordination/OperatorCoordinatorUtil.java      |  61 +++++
 .../operators/coordination/OperatorEvent.java      |  26 ++
 .../coordination/OperatorEventDispatcher.java      |  30 +++
 .../coordination/OperatorEventGateway.java         |  48 ++++
 .../coordination/OperatorEventHandler.java         |  32 +++
 .../coordination/TaskNotRunningException.java      |  33 +++
 .../flink/runtime/scheduler/SchedulerBase.java     |  69 ++++-
 .../flink/runtime/scheduler/SchedulerNG.java       |  18 ++
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  41 +++
 .../runtime/taskexecutor/TaskExecutorGateway.java  |  11 +-
 .../TaskExecutorOperatorEventGateway.java          |  49 ++++
 .../rpc/RpcTaskOperatorEventGateway.java           |  65 +++++
 .../runtime/taskmanager/RuntimeEnvironment.java    |  19 +-
 .../org/apache/flink/runtime/taskmanager/Task.java |  40 +++
 .../utils/SimpleAckingTaskManagerGateway.java      |  12 +
 .../jobmaster/utils/TestingJobMasterGateway.java   |  15 +-
 .../utils/TestingJobMasterGatewayBuilder.java      |  12 +-
 .../OperatorCoordinatorSchedulerTest.java          | 286 +++++++++++++++++++++
 .../operators/coordination/TestOperatorEvent.java  |  26 ++
 .../coordination/TestingOperatorCoordinator.java   | 132 ++++++++++
 .../operators/testutils/DummyEnvironment.java      |   7 +
 .../operators/testutils/MockEnvironment.java       |   7 +
 .../runtime/scheduler/SchedulerTestingUtils.java   |  50 +++-
 .../scheduler/TestExecutionSlotAllocator.java      |   7 +
 .../TestExecutionSlotAllocatorFactory.java         |  12 +-
 .../TaskExecutorOperatorEventHandlingTest.java     | 215 ++++++++++++++++
 .../taskexecutor/TestingTaskExecutorGateway.java   |  19 +-
 .../TestingTaskExecutorGatewayBuilder.java         |  14 +-
 .../taskmanager/NoOpTaskOperatorEventGateway.java  |  34 +++
 .../runtime/taskmanager/TaskAsyncCallTest.java     |   1 +
 .../flink/runtime/taskmanager/TestTaskBuilder.java |   1 +
 .../runtime/testutils/CancelableInvokable.java     |  50 ++++
 .../runtime/util/JvmExitOnFatalErrorTest.java      |   2 +
 .../flink/runtime/util/SerializableFunction.java   |  31 +++
 .../streaming/runtime/tasks/OperatorChain.java     |  20 ++
 .../runtime/tasks/OperatorEventDispatcherImpl.java | 117 +++++++++
 .../flink/streaming/runtime/tasks/StreamTask.java  |  31 +++
 .../tasks/InterruptSensitiveRestoreTest.java       |   2 +
 .../runtime/tasks/StreamMockEnvironment.java       |   7 +
 .../runtime/tasks/StreamTaskTerminationTest.java   |   2 +
 .../streaming/runtime/tasks/StreamTaskTest.java    |   2 +
 .../runtime/tasks/SynchronousCheckpointITCase.java |   2 +
 .../tasks/TaskCheckpointingBehaviourTest.java      |   2 +
 .../apache/flink/core/testutils/FlinkMatchers.java | 161 ++++++++++++
 60 files changed, 2315 insertions(+), 18 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/AutoContextClassLoader.java b/flink-core/src/main/java/org/apache/flink/util/AutoContextClassLoader.java
new file mode 100644
index 0000000..cfe547a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/AutoContextClassLoader.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+/**
+ * Sets a context class loader in a "try-with-resources" pattern.
+ *
+ * <pre>
+ * {@code
+ * try (AutoContextClassLoader ignored = AutoContextClassLoader.of(classloader)) {
+ *     // code that needs the context class loader
+ * }
+ * }
+ * </pre>
+ *
+ * <p>This is conceptually the same as the code below.
+
+ * <pre>
+ * {@code
+ * ClassLoader original = Thread.currentThread().getContextClassLoader();
+ * Thread.currentThread().setContextClassLoader(classloader);
+ * try {
+ *     // code that needs the context class loader
+ * } finally {
+ *     Thread.currentThread().setContextClassLoader(original);
+ * }
+ * }
+ * </pre>
+ */
+public final class AutoContextClassLoader implements AutoCloseable {
+
+	/**
+	 * Sets the context class loader to the given ClassLoader and returns a resource
+	 * that sets it back to the current context ClassLoader when the resource is closed.
+	 *
+	 * <pre>{@code
+	 * try (AutoContextClassLoader ignored = AutoContextClassLoader.of(classloader)) {
+	 *     // code that needs the context class loader
+	 * }
+	 * }</pre>
+	 */
+	public static AutoContextClassLoader of(ClassLoader cl) {
+		final Thread t = Thread.currentThread();
+		final ClassLoader original = t.getContextClassLoader();
+
+		t.setContextClassLoader(cl);
+
+		return new AutoContextClassLoader(t, original);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private final Thread thread;
+
+	private final ClassLoader originalContextClassLoader;
+
+	private AutoContextClassLoader(Thread thread, ClassLoader originalContextClassLoader) {
+		this.thread = thread;
+		this.originalContextClassLoader = originalContextClassLoader;
+	}
+
+	@Override
+	public void close() {
+		thread.setContextClassLoader(originalContextClassLoader);
+	}
+}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
index a7a1db7..95ab72d 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
@@ -38,9 +38,12 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateManager;
@@ -48,6 +51,7 @@ import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 
 import java.util.Collections;
 import java.util.Map;
@@ -219,6 +223,11 @@ public class SavepointEnvironment implements Environment {
 	}
 
 	@Override
+	public TaskOperatorEventGateway getOperatorCoordinatorEventGateway() {
+		return new NoOpTaskOperatorEventGateway();
+	}
+
+	@Override
 	public void failExternally(Throwable cause) {
 		ExceptionUtils.rethrow(cause);
 	}
@@ -297,5 +306,14 @@ public class SavepointEnvironment implements Environment {
 				prioritizedOperatorSubtaskState);
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	//  mocks / stand-ins
+	// ------------------------------------------------------------------------
+
+	private static final class NoOpTaskOperatorEventGateway implements TaskOperatorEventGateway {
+		@Override
+		public void sendOperatorEventToCoordinator(OperatorID operator, SerializedValue<OperatorEvent> event) {}
+	}
 }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index fb17725..b90cbb7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -125,6 +126,11 @@ public interface Environment {
 	InputSplitProvider getInputSplitProvider();
 
 	/**
+	 * Gets the gateway through which operators can send events to the operator coordinators.
+	 */
+	TaskOperatorEventGateway getOperatorCoordinatorEventGateway();
+
+	/**
 	 * Returns the current {@link IOManager}.
 	 *
 	 * @return the current {@link IOManager}.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 59be36b..219ff97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
@@ -53,17 +54,21 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ProducerDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.slf4j.Logger;
@@ -1030,6 +1035,23 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		}
 	}
 
+	/**
+	 * Sends the operator event to the Task on the Task Executor.
+	 *
+	 * @return True, of the message was sent, false is the task is currently not running.
+	 */
+	public CompletableFuture<Acknowledge> sendOperatorEvent(OperatorID operatorId, SerializedValue<OperatorEvent> event) {
+		final LogicalSlot slot = assignedResource;
+
+		if (slot != null && getState() == RUNNING) {
+			final TaskExecutorOperatorEventGateway eventGateway = slot.getTaskManagerGateway();
+			return eventGateway.sendOperatorEventToTask(getAttemptId(), operatorId, event);
+		} else {
+			return FutureUtils.completedExceptionally(new TaskNotRunningException(
+				'"' + vertex.getTaskNameWithSubtaskIndex() + "\" is currently not running or ready."));
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//   Callbacks
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 058c166..4b60652 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -46,6 +46,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorUtil;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.OptionalFailure;
@@ -134,6 +136,8 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	 */
 	private Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey = null;
 
+	private final Map<OperatorID, OperatorCoordinator> operatorCoordinators;
+
 	private InputSplitAssigner splitAssigner;
 
 	/**
@@ -244,6 +248,16 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 			}
 		}
 
+		try {
+			this.operatorCoordinators = OperatorCoordinatorUtil.instantiateCoordinators(
+					jobVertex.getOperatorCoordinators(),
+					graph.getUserClassLoader(),
+					(opId) -> new ExecutionJobVertexCoordinatorContext(opId, this));
+		}
+		catch (IOException | ClassNotFoundException e) {
+			throw new JobException("Cannot instantiate the coordinator for operator " + getName(), e);
+		}
+
 		// set up the input splits, if the vertex has any
 		try {
 			@SuppressWarnings("unchecked")
@@ -383,6 +397,15 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		return getJobVertex().getInputDependencyConstraint();
 	}
 
+	@Nullable
+	public OperatorCoordinator getOperatorCoordinator(OperatorID operatorId) {
+		return operatorCoordinators.get(operatorId);
+	}
+
+	public Collection<OperatorCoordinator> getOperatorCoordinators() {
+		return Collections.unmodifiableCollection(operatorCoordinators.values());
+	}
+
 	public Either<SerializedValue<TaskInformation>, PermanentBlobKey> getTaskInformationOrBlobKey() throws IOException {
 		// only one thread should offload the task information, so let's also let only one thread
 		// serialize the task information!
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexCoordinatorContext.java
new file mode 100644
index 0000000..07fb44c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexCoordinatorContext.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An implementation of the {@link OperatorCoordinator.Context} that delegates call to an
+ * {@link ExecutionJobVertex}.
+ */
+final class ExecutionJobVertexCoordinatorContext implements OperatorCoordinator.Context {
+
+	private final OperatorID operatorId;
+
+	private final ExecutionJobVertex jobVertex;
+
+	ExecutionJobVertexCoordinatorContext(OperatorID operatorId, ExecutionJobVertex jobVertex) {
+		this.operatorId = operatorId;
+		this.jobVertex = jobVertex;
+	}
+
+	@Override
+	public OperatorID getOperatorId() {
+		return operatorId;
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt, int targetSubtask) {
+		final SerializedValue<OperatorEvent> serializedEvent;
+		try {
+			serializedEvent = new SerializedValue<>(evt);
+		}
+		catch (IOException e) {
+			// we do not expect that this exception is handled by the caller, so we make it
+			// unchecked so that it can bubble up
+			throw new FlinkRuntimeException("Cannot serialize operator event", e);
+		}
+
+		return getTaskExecution(targetSubtask).sendOperatorEvent(operatorId, serializedEvent);
+	}
+
+	@Override
+	public void failTask(int subtask, Throwable cause) {
+		final Execution taskExecution = getTaskExecution(subtask);
+		taskExecution.fail(cause);
+	}
+
+	@Override
+	public void failJob(Throwable cause) {
+		jobVertex.getGraph().failGlobal(cause);
+	}
+
+	private Execution getTaskExecution(int subtask) {
+		return jobVertex.getTaskVertices()[subtask].getCurrentExecutionAttempt();
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 6acc519..2a0b432 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -650,6 +650,8 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 				}
 			}
 
+			jobVertex.getOperatorCoordinators().forEach((c -> c.subtaskFailed(getParallelSubtaskIndex())));
+
 			CoLocationGroup grp = jobVertex.getCoLocationGroup();
 			if (grp != null) {
 				locationConstraint = grp.getLocationConstraint(subTaskIndex);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 79b6dd3..e9616a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -27,11 +27,14 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -67,7 +70,10 @@ public class JobVertex implements java.io.Serializable {
 	/** List of edges with incoming data. One per Reader. */
 	private final ArrayList<JobEdge> inputs = new ArrayList<>();
 
-	/** Number of subtasks to split this task into at runtime. */
+	/** The list of factories for operator coordinators. */
+	private final ArrayList<SerializedValue<OperatorCoordinator.Provider>> operatorCoordinators = new ArrayList<>();
+
+	/** Number of subtasks to split this task into at runtime.*/
 	private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
 	/** Maximum number of subtasks to split this task into a runtime. */
@@ -359,6 +365,14 @@ public class JobVertex implements java.io.Serializable {
 		return this.inputs;
 	}
 
+	public List<SerializedValue<OperatorCoordinator.Provider>> getOperatorCoordinators() {
+		return Collections.unmodifiableList(operatorCoordinators);
+	}
+
+	public void addOperatorCoordinator(SerializedValue<OperatorCoordinator.Provider> serializedCoordinatorProvider) {
+		operatorCoordinators.add(serializedCoordinatorProvider);
+	}
+
 	/**
 	 * Associates this vertex with a slot sharing group for scheduling. Different vertices in the same
 	 * slot sharing group can run one subtask each in the same slot.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index b03fe79..5770031 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -24,6 +24,10 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedValue;
 
 import java.util.concurrent.Future;
 
@@ -256,4 +260,8 @@ public abstract class AbstractInvokable {
 	public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
 		throw new UnsupportedOperationException(String.format("notifyCheckpointCompleteAsync not supported by %s", this.getClass().getName()));
 	}
+
+	public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event) throws FlinkException {
+		throw new UnsupportedOperationException("dispatchOperatorEvent not supported by " + getClass().getName());
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/TaskOperatorEventGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/TaskOperatorEventGateway.java
new file mode 100644
index 0000000..c18eabe
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/TaskOperatorEventGateway.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobmaster.JobMasterOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.SerializedValue;
+
+/**
+ * Gateway to send an {@link OperatorEvent} from a Task to to the {@link OperatorCoordinator} JobManager side.
+ *
+ * <p>This is the first step in the chain of sending Operator Events from Operator to Coordinator.
+ * Each layer adds further context, so that the inner layers do not need to know about the complete context,
+ * which keeps dependencies small and makes testing easier.
+ * <pre>
+ *     <li>{@code OperatorEventGateway} takes the event, enriches the event with the {@link OperatorID}, and
+ *         forwards it to:</li>
+ *     <li>{@link TaskOperatorEventGateway} enriches the event with the {@link ExecutionAttemptID} and
+ *         forwards it to the:</li>
+ *     <li>{@link JobMasterOperatorEventGateway} which is RPC interface from the TaskManager to the JobManager.</li>
+ * </pre>
+ */
+public interface TaskOperatorEventGateway {
+
+	/**
+	 * Send an event from the operator (identified by the given operator ID) to the operator
+	 * coordinator (identified by the same ID).
+	 */
+	void sendOperatorEventToCoordinator(OperatorID operator, SerializedValue<OperatorEvent> event);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index 38e6a58..da21982 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -26,9 +26,13 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
+import org.apache.flink.util.SerializedValue;
 
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -36,7 +40,7 @@ import java.util.concurrent.CompletableFuture;
 /**
  * Task manager gateway interface to communicate with the task manager.
  */
-public interface TaskManagerGateway {
+public interface TaskManagerGateway extends TaskExecutorOperatorEventGateway {
 
 	/**
 	 * Return the address of the task manager with which the gateway is associated.
@@ -146,4 +150,10 @@ public interface TaskManagerGateway {
 		final AllocationID allocationId,
 		final Throwable cause,
 		@RpcTimeout final Time timeout);
+
+	@Override
+	CompletableFuture<Acknowledge> sendOperatorEventToTask(
+		ExecutionAttemptID task,
+		OperatorID operator,
+		SerializedValue<OperatorEvent> evt);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index ebf36a2..474ea93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
@@ -61,6 +62,7 @@ import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
@@ -88,6 +90,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
 
@@ -457,6 +460,21 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	}
 
 	@Override
+	public CompletableFuture<Acknowledge> sendOperatorEventToCoordinator(
+			final ExecutionAttemptID task,
+			final OperatorID operatorID,
+			final SerializedValue<OperatorEvent> serializedEvent) {
+
+		try {
+			final OperatorEvent evt = serializedEvent.deserializeValue(userCodeLoader);
+			schedulerNG.deliverOperatorEventToCoordinator(task, operatorID, evt);
+			return CompletableFuture.completedFuture(Acknowledge.get());
+		} catch (Exception e) {
+			return FutureUtils.completedExceptionally(e);
+		}
+	}
+
+	@Override
 	public CompletableFuture<KvStateLocation> requestKvStateLocation(final JobID jobId, final String registrationName) {
 		try {
 			return CompletableFuture.completedFuture(schedulerNG.requestKvStateLocation(jobId, registrationName));
@@ -679,7 +697,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		}
 
 		Object accumulator = accumulators.get(aggregateName);
-		if(null == accumulator) {
+		if (null == accumulator) {
 			accumulator = aggregateFunction.createAccumulator();
 		}
 		accumulator = aggregateFunction.add(aggregand, accumulator);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 866c2e0..506564d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -57,7 +57,8 @@ public interface JobMasterGateway extends
 	CheckpointCoordinatorGateway,
 	FencedRpcGateway<JobMasterId>,
 	KvStateLocationOracle,
-	KvStateRegistryGateway {
+	KvStateRegistryGateway,
+	JobMasterOperatorEventGateway {
 
 	/**
 	 * Cancels the currently executed job.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterOperatorEventGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterOperatorEventGateway.java
new file mode 100644
index 0000000..4d39e94
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterOperatorEventGateway.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Gateway to send an {@link OperatorEvent} from the Task Manager to to the {@link OperatorCoordinator} on
+ * the JobManager side.
+ *
+ * <p>This is the first step in the chain of sending Operator Events from Operator to Coordinator.
+ * Each layer adds further context, so that the inner layers do not need to know about the complete context,
+ * which keeps dependencies small and makes testing easier.
+ * <pre>
+ *     <li>{@code OperatorEventGateway} takes the event, enriches the event with the {@link OperatorID}, and
+ *         forwards it to:</li>
+ *     <li>{@link TaskOperatorEventGateway} enriches the event with the {@link ExecutionAttemptID} and
+ *         forwards it to the:</li>
+ *     <li>{@link JobMasterOperatorEventGateway} which is RPC interface from the TaskManager to the JobManager.</li>
+ * </pre>
+ */
+public interface JobMasterOperatorEventGateway {
+
+	CompletableFuture<Acknowledge> sendOperatorEventToCoordinator(
+			ExecutionAttemptID task,
+			OperatorID operatorID,
+			SerializedValue<OperatorEvent> event);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index bb0af2b..9aec97e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -26,11 +26,14 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 
 import java.util.Collections;
 import java.util.Set;
@@ -106,4 +109,12 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
 			cause,
 			timeout);
 	}
+
+	@Override
+	public CompletableFuture<Acknowledge> sendOperatorEventToTask(
+		ExecutionAttemptID task,
+		OperatorID operator,
+		SerializedValue<OperatorEvent> evt) {
+		return taskExecutorGateway.sendOperatorEventToTask(task, operator, evt);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
new file mode 100644
index 0000000..b5951c2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+import java.io.Serializable;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A coordinator for runtime operators. The OperatorCoordinator runs on the master, associated with
+ * the job vertex of the operator. It communicated with operators via sending operator events.
+ *
+ * <p>Operator coordinators are for example source and sink coordinators that discover and assign
+ * work, or aggregate and commit metadata.
+ */
+public interface OperatorCoordinator extends AutoCloseable {
+
+	/**
+	 * Starts the coordinator. This method is called once at the beginning, before any other methods.
+	 *
+	 * @throws Exception Any exception thrown from this method causes a full job failure.
+	 */
+	void start() throws Exception;
+
+	/**
+	 * This method is called when the coordinator is disposed. This method should release currently
+	 * held resources. Exceptions in this method do not cause the job to fail.
+	 */
+	@Override
+	void close() throws Exception;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Hands an OperatorEvent from a task (on the Task Manager) to this coordinator.
+	 *
+	 * @throws Exception Any exception thrown by this method results in a full job failure and recovery.
+	 */
+	void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Called when one of the subtasks of the task running the coordinated operator failed.
+	 */
+	void subtaskFailed(int subtask);
+
+	// ------------------------------------------------------------------------
+
+	CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception;
+
+	void checkpointComplete(long checkpointId) throws Exception;
+
+	void resetToCheckpoint(byte[] checkpointData) throws Exception;
+
+	// ------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The context gives the OperatorCoordinator access to contextual information and provides a
+	 * gateway to interact with other components, such as sending operator events.
+	 */
+	interface Context {
+
+		OperatorID getOperatorId();
+
+		CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt, int targetSubtask) throws TaskNotRunningException;
+
+		void failTask(int subtask, Throwable cause);
+
+		void failJob(Throwable cause);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The provider creates an OperatorCoordinator and takes a {@link Context} to pass to the OperatorCoordinator.
+	 * This method is, for example, called on the job manager when the scheduler and execution graph are
+	 * created, to instantiate the OperatorCoordinator.
+	 *
+	 * <p>The factory is {@link Serializable}, because it is attached to the JobGraph and is part
+	 * of the serialized job graph that is sent to the dispatcher, or stored for recovery.
+	 */
+	interface Provider extends Serializable {
+
+		OperatorID getOperatorId();
+
+		OperatorCoordinator create(Context context);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorUtil.java
new file mode 100644
index 0000000..767bc39
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorUtil.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.util.AutoContextClassLoader;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * A utility to for dealing with the {@link OperatorCoordinator}.
+ */
+public final class OperatorCoordinatorUtil {
+
+	public static Map<OperatorID, OperatorCoordinator> instantiateCoordinators(
+			List<SerializedValue<OperatorCoordinator.Provider>> providers,
+			ClassLoader classLoader,
+			Function<OperatorID, OperatorCoordinator.Context> contextFactory) throws IOException, ClassNotFoundException {
+
+		try (AutoContextClassLoader ignored = AutoContextClassLoader.of(classLoader)) {
+
+			final HashMap<OperatorID, OperatorCoordinator> coordinators = new HashMap<>();
+
+			for (SerializedValue<OperatorCoordinator.Provider> serializedProvider : providers) {
+				final OperatorCoordinator.Provider provider = serializedProvider.deserializeValue(classLoader);
+				final OperatorID id = provider.getOperatorId();
+				final OperatorCoordinator.Context context = contextFactory.apply(id);
+				final OperatorCoordinator coordinator = provider.create(context);
+				coordinators.put(id, coordinator);
+			}
+
+			return coordinators;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** Utility class, not meant to be instantiated. */
+	private OperatorCoordinatorUtil() {}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEvent.java
new file mode 100644
index 0000000..1a8fca2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEvent.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import java.io.Serializable;
+
+/**
+ * Root interface for all events sent between {@link OperatorCoordinator} and an {@link OperatorEventHandler}.
+ */
+public interface OperatorEvent extends Serializable {}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventDispatcher.java
new file mode 100644
index 0000000..db9ff94
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventDispatcher.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+/**
+ * The dispatcher through which Operators receive operator events and through which they can send operator
+ * events back to the coordinator.
+ */
+public interface OperatorEventDispatcher {
+
+	OperatorEventGateway registerEventHandler(OperatorID operator, OperatorEventHandler handler);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventGateway.java
new file mode 100644
index 0000000..e2c6e40
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventGateway.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterOperatorEventGateway;
+
+/**
+ * The gateway through which an Operator can send an {@link OperatorEvent} to the {@link OperatorCoordinator}
+ * on the JobManager side.
+ *
+ * <p>This is the first step in the chain of sending Operator Events from Operator to Coordinator.
+ * Each layer adds further context, so that the inner layers do not need to know about the complete context,
+ * which keeps dependencies small and makes testing easier.
+ * <pre>
+ *     <li>{@code OperatorEventGateway} takes the event, enriches the event with the {@link OperatorID}, and
+ *         forwards it to:</li>
+ *     <li>{@link TaskOperatorEventGateway} enriches the event with the {@link ExecutionAttemptID} and
+ *         forwards it to the:</li>
+ *     <li>{@link JobMasterOperatorEventGateway} which is RPC interface from the TaskManager to the JobManager.</li>
+ * </pre>
+ */
+public interface OperatorEventGateway {
+
+	/**
+	 * Sends the given event to the coordinator, where it will be handled by the
+	 * {@link OperatorCoordinator#handleEventFromOperator(int, OperatorEvent)} method.
+	 */
+	void sendEventToCoordinator(OperatorEvent event);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventHandler.java
new file mode 100644
index 0000000..79e9d9f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventHandler.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+/**
+ * Interface for handlers of operator events on the operator side.
+ * Operator events are sent between an {@link OperatorCoordinator} and a runtime operator (which registers
+ * this handler).
+ *
+ * <p>The counterpart to this handler is the {@link OperatorCoordinator#handleEventFromOperator(int, OperatorEvent)}
+ * method.
+ */
+public interface OperatorEventHandler {
+
+	void handleOperatorEvent(OperatorEvent evt);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/TaskNotRunningException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/TaskNotRunningException.java
new file mode 100644
index 0000000..7999ad8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/TaskNotRunningException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * An exception indicating that a target task is not running.
+ */
+public class TaskNotRunningException extends FlinkException {
+
+	private static final long serialVersionUID = 1L;
+
+	public TaskNotRunningException(String message) {
+		super(message);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 609877e..9ac6382 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
@@ -74,6 +75,9 @@ import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
@@ -87,7 +91,10 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.IterableUtils;
 import org.apache.flink.util.function.FunctionUtils;
@@ -400,7 +407,7 @@ public abstract class SchedulerBase implements SchedulerNG {
 		return execution.getVertex().getID();
 	}
 
-	protected ExecutionVertex getExecutionVertex(final ExecutionVertexID executionVertexId) {
+	public ExecutionVertex getExecutionVertex(final ExecutionVertexID executionVertexId) {
 		return executionGraph.getAllVertices().get(executionVertexId.getJobVertexId()).getTaskVertices()[executionVertexId.getSubtaskIndex()];
 	}
 
@@ -440,6 +447,7 @@ public abstract class SchedulerBase implements SchedulerNG {
 	public final void startScheduling() {
 		mainThreadExecutor.assertRunningInMainThread();
 		registerJobMetrics();
+		startAllOperatorCoordinators();
 		startSchedulingInternal();
 	}
 
@@ -456,6 +464,7 @@ public abstract class SchedulerBase implements SchedulerNG {
 
 		incrementVersionsOfAllVertices();
 		executionGraph.suspend(cause);
+		disposeAllOperatorCoordinators();
 	}
 
 	@Override
@@ -886,4 +895,62 @@ public abstract class SchedulerBase implements SchedulerNG {
 			.orElse("Unknown location");
 	}
 
+	@Override
+	public void deliverOperatorEventToCoordinator(
+			final ExecutionAttemptID taskExecutionId,
+			final OperatorID operatorId,
+			final OperatorEvent evt) throws FlinkException {
+
+		// Failure semantics (as per the javadocs of the method):
+		// If the task manager sends an event for a non-running task or an non-existing operator
+		// coordinator, then respond with an exception to the call. If task and coordinator exist,
+		// then we assume that the call from the TaskManager was valid, and any bubbling exception
+		// needs to cause a job failure.
+
+		final Execution exec = executionGraph.getRegisteredExecutions().get(taskExecutionId);
+		if (exec == null || exec.getState() != ExecutionState.RUNNING) {
+			// This situation is common when cancellation happens, or when the task failed while the
+			// event was just being dispatched asynchronously on the TM side.
+			// It should be fine in those expected situations to just ignore this event, but, to be
+			// on the safe, we notify the TM that the event could not be delivered.
+			throw new TaskNotRunningException("Task is not known or in state running on the JobManager.");
+		}
+
+		final ExecutionJobVertex ejv = exec.getVertex().getJobVertex();
+		final OperatorCoordinator coordinator = ejv.getOperatorCoordinator(operatorId);
+		if (coordinator == null) {
+			throw new FlinkException("No coordinator registered for operator " + operatorId);
+		}
+
+		try {
+			coordinator.handleEventFromOperator(exec.getParallelSubtaskIndex(), evt);
+		} catch (Throwable t) {
+			ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+			failJob(t);
+		}
+	}
+
+	private void startAllOperatorCoordinators() {
+		final Collection<OperatorCoordinator> coordinators = getAllCoordinators();
+		try {
+			for (OperatorCoordinator coordinator : coordinators) {
+				coordinator.start();
+			}
+		}
+		catch (Throwable t) {
+			ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+			coordinators.forEach(IOUtils::closeQuietly);
+			throw new FlinkRuntimeException("Failed to start the operator coordinators", t);
+		}
+	}
+
+	private void disposeAllOperatorCoordinators() {
+		getAllCoordinators().forEach(IOUtils::closeQuietly);
+	}
+
+	private Collection<OperatorCoordinator> getAllCoordinators() {
+		return getExecutionGraph().getAllVertices().values().stream()
+			.flatMap((vertex) -> vertex.getOperatorCoordinators().stream())
+			.collect(Collectors.toList());
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index 381afb7..5a75d37 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -34,11 +34,14 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
@@ -120,4 +123,19 @@ public interface SchedulerNG {
 	void declineCheckpoint(DeclineCheckpoint decline);
 
 	CompletableFuture<String> stopWithSavepoint(String targetDirectory, boolean advanceToEndOfEventTime);
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Delivers the given OperatorEvent to the {@link OperatorCoordinator} with the given {@link OperatorID}.
+	 *
+	 * <p>Failure semantics: If the task manager sends an event for a non-running task or a
+	 * non-existing operator coordinator, then respond with an exception to the call.
+	 * If task and coordinator exist, then we assume that the call from the TaskManager was
+	 * valid, and any bubbling exception needs to cause a job failure
+	 *
+	 * @throws FlinkException Thrown, if the task is not running or no operator/coordinator exists
+	 *                        for the given ID.
+	 */
+	void deliverOperatorEventToCoordinator(ExecutionAttemptID taskExecution, OperatorID operator, OperatorEvent evt) throws FlinkException;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 67acbd8..7361818 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -57,7 +57,9 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNo
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionInfo;
 import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
 import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
 import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
@@ -72,6 +74,8 @@ import org.apache.flink.runtime.messages.TaskBackPressureResponse;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateServer;
@@ -102,6 +106,7 @@ import org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcKvStateRegistryListener;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcPartitionStateChecker;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.taskexecutor.rpc.RpcTaskOperatorEventGateway;
 import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
 import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException;
 import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
@@ -116,6 +121,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedValue;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
 
@@ -464,6 +470,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 		try {
 			final JobID jobId = tdd.getJobId();
+			final ExecutionAttemptID executionAttemptID = tdd.getExecutionAttemptId();
 			final JobManagerConnection jobManagerConnection = jobManagerTable.get(jobId);
 
 			if (jobManagerConnection == null) {
@@ -528,6 +535,11 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 				tdd.getExecutionAttemptId(),
 				taskManagerConfiguration.getTimeout());
 
+			final TaskOperatorEventGateway taskOperatorEventGateway = new RpcTaskOperatorEventGateway(
+				jobManagerConnection.getJobManagerGateway(),
+				executionAttemptID,
+				(t) -> runAsync(() -> failTask(executionAttemptID, t)));
+
 			TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
 			CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
 			GlobalAggregateManager aggregateManager = jobManagerConnection.getGlobalAggregateManager();
@@ -578,6 +590,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 				taskManagerActions,
 				inputSplitProvider,
 				checkpointResponder,
+				taskOperatorEventGateway,
 				aggregateManager,
 				blobCacheService,
 				libraryCache,
@@ -949,6 +962,34 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		}
 	}
 
+	// ----------------------------------------------------------------------
+	// Other RPCs
+	// ----------------------------------------------------------------------
+
+	@Override
+	public CompletableFuture<Acknowledge> sendOperatorEventToTask(
+			ExecutionAttemptID executionAttemptID,
+			OperatorID operatorId,
+			SerializedValue<OperatorEvent> evt) {
+
+		log.debug("Operator event for {} - {}", executionAttemptID, operatorId);
+
+		final Task task = taskSlotTable.getTask(executionAttemptID);
+		if (task == null) {
+			return FutureUtils.completedExceptionally(new TaskNotRunningException(
+				"Task " + executionAttemptID.toHexString() + " not running on TaskManager"));
+		}
+
+		try {
+			task.deliverOperatorEvent(operatorId, evt);
+			return CompletableFuture.completedFuture(Acknowledge.get());
+		}
+		catch (Throwable t) {
+			ExceptionUtils.rethrowIfFatalError(t);
+			return FutureUtils.completedExceptionally(t);
+		}
+	}
+
 	// ======================================================================
 	//  Internal methods
 	// ======================================================================
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 6086f67..d61766a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -31,15 +31,18 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.types.SerializableOptional;
+import org.apache.flink.util.SerializedValue;
 
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -47,7 +50,7 @@ import java.util.concurrent.CompletableFuture;
 /**
  * {@link TaskExecutor} RPC gateway interface.
  */
-public interface TaskExecutorGateway extends RpcGateway {
+public interface TaskExecutorGateway extends RpcGateway, TaskExecutorOperatorEventGateway {
 
 	/**
 	 * Requests a slot from the TaskManager.
@@ -212,4 +215,10 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @return Future flag indicating whether the task executor can be released.
 	 */
 	CompletableFuture<Boolean> canBeReleased();
+
+	@Override
+	CompletableFuture<Acknowledge> sendOperatorEventToTask(
+			ExecutionAttemptID task,
+			OperatorID operator,
+			SerializedValue<OperatorEvent> evt);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventGateway.java
new file mode 100644
index 0000000..cd78048
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventGateway.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The gateway through which the {@link OperatorCoordinator} can send an event to an Operator on the Task Manager
+ * side.
+ */
+public interface TaskExecutorOperatorEventGateway {
+
+	/**
+	 * Sends an operator event to an operator in a task executed by the Task Manager (Task Executor).
+	 *
+	 * <p>The reception is acknowledged (future is completed) when the event has been dispatched to the
+	 * {@link org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable#dispatchOperatorEvent(OperatorID, SerializedValue)}
+	 * method. It is not guaranteed that the event is processed successfully within the implementation.
+	 * These cases are up to the task and event sender to handle (for example with an explicit response
+	 * message upon success, or by triggering failure/recovery upon exception).
+	 */
+	CompletableFuture<Acknowledge> sendOperatorEventToTask(
+			ExecutionAttemptID task,
+			OperatorID operator,
+			SerializedValue<OperatorEvent> evt);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcTaskOperatorEventGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcTaskOperatorEventGateway.java
new file mode 100644
index 0000000..834b001
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcTaskOperatorEventGateway.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.rpc;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterOperatorEventGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * An OperatorEventSender that calls the RPC gateway {@link JobMasterOperatorEventGateway} to
+ * send the messages to the coordinator.
+ */
+public class RpcTaskOperatorEventGateway implements TaskOperatorEventGateway {
+
+	private final JobMasterOperatorEventGateway rpcGateway;
+
+	private final ExecutionAttemptID taskExecutionId;
+
+	private final Consumer<Throwable> errorHandler;
+
+	public RpcTaskOperatorEventGateway(
+			JobMasterOperatorEventGateway rpcGateway,
+			ExecutionAttemptID taskExecutionId,
+			Consumer<Throwable> errorHandler) {
+
+		this.rpcGateway = rpcGateway;
+		this.taskExecutionId = taskExecutionId;
+		this.errorHandler = errorHandler;
+	}
+
+	@Override
+	public void sendOperatorEventToCoordinator(OperatorID operator, SerializedValue<OperatorEvent> event) {
+		final CompletableFuture<Acknowledge> result =
+			rpcGateway.sendOperatorEventToCoordinator(taskExecutionId, operator, event);
+
+		result.whenComplete((success, exception) -> {
+			if (exception != null) {
+				errorHandler.accept(exception);
+			}
+		});
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 7693f82..dc8878b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -35,14 +35,15 @@ import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
 
 import java.util.Map;
 import java.util.concurrent.Future;
-import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -54,9 +55,9 @@ public class RuntimeEnvironment implements Environment {
 	private final JobID jobId;
 	private final JobVertexID jobVertexId;
 	private final ExecutionAttemptID executionId;
-	
+
 	private final TaskInfo taskInfo;
-	
+
 	private final Configuration jobConfiguration;
 	private final Configuration taskConfiguration;
 	private final ExecutionConfig executionConfig;
@@ -69,15 +70,16 @@ public class RuntimeEnvironment implements Environment {
 	private final TaskStateManager taskStateManager;
 	private final GlobalAggregateManager aggregateManager;
 	private final InputSplitProvider splitProvider;
-	
+
 	private final Map<String, Future<Path>> distCacheEntries;
 
 	private final ResultPartitionWriter[] writers;
 	private final InputGate[] inputGates;
 
 	private final TaskEventDispatcher taskEventDispatcher;
-	
+
 	private final CheckpointResponder checkpointResponder;
+	private final TaskOperatorEventGateway operatorEventGateway;
 
 	private final AccumulatorRegistry accumulatorRegistry;
 
@@ -112,6 +114,7 @@ public class RuntimeEnvironment implements Environment {
 			InputGate[] inputGates,
 			TaskEventDispatcher taskEventDispatcher,
 			CheckpointResponder checkpointResponder,
+			TaskOperatorEventGateway operatorEventGateway,
 			TaskManagerRuntimeInfo taskManagerInfo,
 			TaskMetricGroup metrics,
 			Task containingTask) {
@@ -137,6 +140,7 @@ public class RuntimeEnvironment implements Environment {
 		this.inputGates = checkNotNull(inputGates);
 		this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
 		this.checkpointResponder = checkNotNull(checkpointResponder);
+		this.operatorEventGateway = checkNotNull(operatorEventGateway);
 		this.taskManagerInfo = checkNotNull(taskManagerInfo);
 		this.containingTask = containingTask;
 		this.metrics = metrics;
@@ -286,6 +290,11 @@ public class RuntimeEnvironment implements Environment {
 	}
 
 	@Override
+	public TaskOperatorEventGateway getOperatorCoordinatorEventGateway() {
+		return operatorEventGateway;
+	}
+
+	@Override
 	public void failExternally(Throwable cause) {
 		this.containingTask.failExternally(cause);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 3482e6c..15d884b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -57,10 +57,14 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
@@ -206,6 +210,9 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
 	/** Checkpoint notifier used to communicate with the CheckpointCoordinator. */
 	private final CheckpointResponder checkpointResponder;
 
+	/** The gateway for operators to send messages to the operator coordinators on the Job Manager. */
+	private final TaskOperatorEventGateway operatorCoordinatorEventGateway;
+
 	/** GlobalAggregateManager used to update aggregates on the JobMaster. */
 	private final GlobalAggregateManager aggregateManager;
 
@@ -292,6 +299,7 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
 		TaskManagerActions taskManagerActions,
 		InputSplitProvider inputSplitProvider,
 		CheckpointResponder checkpointResponder,
+		TaskOperatorEventGateway operatorCoordinatorEventGateway,
 		GlobalAggregateManager aggregateManager,
 		BlobCacheService blobService,
 		LibraryCacheManager libraryCache,
@@ -342,6 +350,7 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
 
 		this.inputSplitProvider = Preconditions.checkNotNull(inputSplitProvider);
 		this.checkpointResponder = Preconditions.checkNotNull(checkpointResponder);
+		this.operatorCoordinatorEventGateway = Preconditions.checkNotNull(operatorCoordinatorEventGateway);
 		this.aggregateManager = Preconditions.checkNotNull(aggregateManager);
 		this.taskManagerActions = checkNotNull(taskManagerActions);
 
@@ -672,6 +681,7 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
 				inputGates,
 				taskEventDispatcher,
 				checkpointResponder,
+				operatorCoordinatorEventGateway,
 				taskManagerConfig,
 				metrics,
 				this);
@@ -1205,6 +1215,36 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
 		}
 	}
 
+	/**
+	 * Dispatches an operator event to the invokable task.
+	 *
+	 * <p>If the event delivery did not succeed, this method throws an exception. Callers can use that
+	 * exception for error reporting, but need not react with failing this task (this method takes care
+	 * of that).
+	 *
+	 * @throws FlinkException This method throws exceptions indicating the reason why delivery did not succeed.
+	 */
+	public void deliverOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> evt) throws FlinkException {
+		final AbstractInvokable invokable = this.invokable;
+
+		if (invokable == null || executionState != ExecutionState.RUNNING) {
+			throw new TaskNotRunningException("Task is not yet running.");
+		}
+
+		try {
+			invokable.dispatchOperatorEvent(operator, evt);
+		}
+		catch (Throwable t) {
+			ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+			if (getExecutionState() == ExecutionState.RUNNING) {
+				FlinkException e = new FlinkException("Error while handling operator event", t);
+				failExternally(e);
+				throw e;
+			}
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index a0d2dfb..7e6ee2d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -27,9 +27,12 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.SerializedValue;
 
 import java.util.Collection;
 import java.util.Set;
@@ -144,6 +147,15 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 	}
 
 	@Override
+	public CompletableFuture<Acknowledge> sendOperatorEventToTask(
+			ExecutionAttemptID task,
+			OperatorID operator,
+			SerializedValue<OperatorEvent> evt) {
+
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
 	public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
 		final BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> currentFreeSlotFunction = freeSlotFunction;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index d9a62b9..13d9630 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -35,12 +35,14 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
@@ -50,6 +52,7 @@ import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.function.TriConsumer;
 import org.apache.flink.util.function.TriFunction;
 
@@ -151,6 +154,9 @@ public class TestingJobMasterGateway implements JobMasterGateway {
 	@Nonnull
 	TriFunction<String, Object, byte[], CompletableFuture<Object>> updateAggregateFunction;
 
+	@Nonnull
+	private final TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> operatorEventSender;
+
 	public TestingJobMasterGateway(
 			@Nonnull String address,
 			@Nonnull String hostname,
@@ -178,7 +184,8 @@ public class TestingJobMasterGateway implements JobMasterGateway {
 			@Nonnull BiFunction<JobID, String, CompletableFuture<KvStateLocation>> requestKvStateLocationFunction,
 			@Nonnull Function<Tuple6<JobID, JobVertexID, KeyGroupRange, String, KvStateID, InetSocketAddress>, CompletableFuture<Acknowledge>> notifyKvStateRegisteredFunction,
 			@Nonnull Function<Tuple4<JobID, JobVertexID, KeyGroupRange, String>, CompletableFuture<Acknowledge>> notifyKvStateUnregisteredFunction,
-			@Nonnull TriFunction<String, Object, byte[], CompletableFuture<Object>> updateAggregateFunction) {
+			@Nonnull TriFunction<String, Object, byte[], CompletableFuture<Object>> updateAggregateFunction,
+			@Nonnull TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> operatorEventSender) {
 		this.address = address;
 		this.hostname = hostname;
 		this.cancelFunction = cancelFunction;
@@ -206,6 +213,7 @@ public class TestingJobMasterGateway implements JobMasterGateway {
 		this.notifyKvStateRegisteredFunction = notifyKvStateRegisteredFunction;
 		this.notifyKvStateUnregisteredFunction = notifyKvStateUnregisteredFunction;
 		this.updateAggregateFunction = updateAggregateFunction;
+		this.operatorEventSender = operatorEventSender;
 	}
 
 	@Override
@@ -347,4 +355,9 @@ public class TestingJobMasterGateway implements JobMasterGateway {
 	public CompletableFuture<Object> updateGlobalAggregate(String aggregateName, Object aggregand, byte[] serializedAggregateFunction) {
 		return updateAggregateFunction.apply(aggregateName, aggregand, serializedAggregateFunction);
 	}
+
+	@Override
+	public CompletableFuture<Acknowledge> sendOperatorEventToCoordinator(ExecutionAttemptID task, OperatorID operatorID, SerializedValue<OperatorEvent> event) {
+		return operatorEventSender.apply(task, operatorID, event);
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
index 6b4f2b7..f2a47e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
@@ -34,12 +34,14 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -50,6 +52,7 @@ import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.function.TriConsumer;
 import org.apache.flink.util.function.TriFunction;
 
@@ -98,6 +101,7 @@ public class TestingJobMasterGatewayBuilder {
 	private Function<Tuple6<JobID, JobVertexID, KeyGroupRange, String, KvStateID, InetSocketAddress>, CompletableFuture<Acknowledge>> notifyKvStateRegisteredFunction = ignored -> CompletableFuture.completedFuture(Acknowledge.get());
 	private Function<Tuple4<JobID, JobVertexID, KeyGroupRange, String>, CompletableFuture<Acknowledge>> notifyKvStateUnregisteredFunction = ignored -> CompletableFuture.completedFuture(Acknowledge.get());
 	private TriFunction<String, Object, byte[], CompletableFuture<Object>> updateAggregateFunction = (a, b, c) -> CompletableFuture.completedFuture(new Object());
+	private TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> operatorEventSender = (a, b, c) -> CompletableFuture.completedFuture(Acknowledge.get());
 
 	public TestingJobMasterGatewayBuilder setAddress(String address) {
 		this.address = address;
@@ -234,6 +238,11 @@ public class TestingJobMasterGatewayBuilder {
 		return this;
 	}
 
+	public TestingJobMasterGatewayBuilder setOperatorEventSender(TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> operatorEventSender) {
+		this.operatorEventSender = operatorEventSender;
+		return this;
+	}
+
 	public TestingJobMasterGateway build() {
 		return new TestingJobMasterGateway(
 			address,
@@ -262,6 +271,7 @@ public class TestingJobMasterGatewayBuilder {
 			requestKvStateLocationFunction,
 			notifyKvStateRegisteredFunction,
 			notifyKvStateUnregisteredFunction,
-			updateAggregateFunction);
+			updateAggregateFunction,
+			operatorEventSender);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
new file mode 100644
index 0000000..17a82f0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the integration of the {@link OperatorCoordinator} with the scheduler, to ensure the relevant
+ * actions are leading to the right method invocations on the coordinator.
+ */
+@SuppressWarnings("serial")
+public class OperatorCoordinatorSchedulerTest extends TestLogger {
+
+	private final JobVertexID testVertexId = new JobVertexID();
+	private final OperatorID testOperatorId = new OperatorID();
+
+	private final ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
+
+	// ------------------------------------------------------------------------
+	//  tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testCoordinatorStartedWhenSchedulerStarts() throws Exception {
+		final DefaultScheduler scheduler = createAndStartScheduler();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		assertTrue(coordinator.isStarted());
+	}
+
+	@Test
+	public void testCoordinatorDisposedWhenSchedulerStops() throws Exception {
+		final DefaultScheduler scheduler = createAndStartScheduler();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		scheduler.suspend(new Exception("test suspend"));
+
+		assertTrue(coordinator.isClosed());
+	}
+
+	@Test
+	public void testFailureToStartPropagatesExceptions() throws Exception {
+		final OperatorCoordinator.Provider failingCoordinatorProvider =
+			new TestingOperatorCoordinator.Provider(testOperatorId, CoordinatorThatFailsInStart::new);
+		final DefaultScheduler scheduler = createScheduler(failingCoordinatorProvider);
+
+		try {
+			scheduler.startScheduling();
+			fail("expected an exception");
+		} catch (Exception ignored) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testFailureToStartClosesCoordinator() throws Exception {
+		final OperatorCoordinator.Provider failingCoordinatorProvider =
+				new TestingOperatorCoordinator.Provider(testOperatorId, CoordinatorThatFailsInStart::new);
+		final DefaultScheduler scheduler = createScheduler(failingCoordinatorProvider);
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		try {
+			scheduler.startScheduling();
+		} catch (Exception ignored) {}
+
+		assertTrue(coordinator.isClosed());
+	}
+
+	@Test
+	public void taskFailureNotifiesCoordinator() throws Exception {
+		final DefaultScheduler scheduler = createAndStartScheduler();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		failTask(scheduler, 1);
+		executor.triggerScheduledTasks();
+
+		assertEquals(1, coordinator.getFailedTasks().size());
+		assertThat(coordinator.getFailedTasks(), contains(1));
+		assertThat(coordinator.getFailedTasks(), not(contains(0)));
+	}
+
+	@Test
+	public void taskRepeatedFailureNotifyCoordinator() throws Exception {
+		final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		failTask(scheduler, 0);
+		executor.triggerScheduledTasks();
+		failTask(scheduler, 0);
+		executor.triggerScheduledTasks();
+
+		assertEquals(2, coordinator.getFailedTasks().size());
+		assertThat(coordinator.getFailedTasks(), contains(0, 0));
+	}
+
+	@Test
+	public void taskExceptionWhenTasksNotRunning() throws Exception {
+		final DefaultScheduler scheduler = createAndStartScheduler();
+		final OperatorCoordinator.Context context = getCoordinator(scheduler).getContext();
+
+		final CompletableFuture<?> result = context.sendEvent(new TestOperatorEvent(), 0);
+
+		assertThat(result, futureFailedWith(TaskNotRunningException.class));
+	}
+
+	@Test
+	public void taskTaskManagerFailuresAreReportedBack() throws Exception {
+		final DefaultScheduler scheduler = createSchedulerAndDeployTasks(new FailingTaskExecutorOperatorEventGateway());
+
+		final OperatorCoordinator.Context context = getCoordinator(scheduler).getContext();
+		final CompletableFuture<?> result = context.sendEvent(new TestOperatorEvent(), 0);
+
+		assertThat(result, futureFailedWith(TestException.class));
+	}
+
+	// ------------------------------------------------------------------------
+	//  test setups
+	// ------------------------------------------------------------------------
+
+	private DefaultScheduler createScheduler(OperatorCoordinator.Provider provider) throws Exception {
+		return setupTestJobAndScheduler(provider, null, false);
+	}
+
+	private DefaultScheduler createAndStartScheduler() throws Exception {
+		final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), null, false);
+		scheduler.startScheduling();
+		return scheduler;
+	}
+
+	private DefaultScheduler createSchedulerAndDeployTasks() throws Exception {
+		final DefaultScheduler scheduler = createAndStartScheduler();
+		SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
+		return scheduler;
+	}
+
+	private DefaultScheduler createSchedulerAndDeployTasks(TaskExecutorOperatorEventGateway gateway) throws Exception {
+		final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), gateway, false);
+		scheduler.startScheduling();
+		SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
+		return scheduler;
+	}
+
+	private DefaultScheduler createSchedulerWithCheckpointing() throws Exception {
+		final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), null, true);
+		scheduler.startScheduling();
+		SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
+		return scheduler;
+	}
+
+	private DefaultScheduler setupTestJobAndScheduler(
+			OperatorCoordinator.Provider provider,
+			@Nullable TaskExecutorOperatorEventGateway taskExecutorOperatorEventGateway,
+			boolean enableCheckpoints) throws Exception {
+
+		final JobVertex vertex = new JobVertex("Vertex with OperatorCoordinator", testVertexId);
+		vertex.setInvokableClass(NoOpInvokable.class);
+		vertex.addOperatorCoordinator(new SerializedValue<>(provider));
+		vertex.setParallelism(2);
+
+		final JobGraph jobGraph = new JobGraph("test job with OperatorCoordinator", vertex);
+		if (enableCheckpoints) {
+			SchedulerTestingUtils.enableCheckpointing(jobGraph);
+		}
+
+		final DefaultScheduler scheduler = taskExecutorOperatorEventGateway == null
+				? SchedulerTestingUtils.createScheduler(jobGraph, executor)
+				: SchedulerTestingUtils.createScheduler(jobGraph, executor, taskExecutorOperatorEventGateway);
+		scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+		return scheduler;
+	}
+
+	private TestingOperatorCoordinator getCoordinator(DefaultScheduler scheduler) {
+		final ExecutionJobVertex vertexWithCoordinator = getJobVertex(scheduler, testVertexId);
+		assertNotNull("vertex for coordinator not found", vertexWithCoordinator);
+
+		final OperatorCoordinator coordinator = vertexWithCoordinator.getOperatorCoordinator(testOperatorId);
+		assertNotNull("vertex does not contain coordinator", coordinator);
+		assertThat(coordinator, instanceOf(TestingOperatorCoordinator.class));
+
+		return (TestingOperatorCoordinator) coordinator;
+	}
+
+	// ------------------------------------------------------------------------
+	//  test actions
+	// ------------------------------------------------------------------------
+
+	private void failTask(DefaultScheduler scheduler, int subtask) {
+		final ExecutionJobVertex ejv = getJobVertex(scheduler, testVertexId);
+		assert ejv != null;
+		final ExecutionAttemptID attemptID = ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
+
+		scheduler.updateTaskExecutionState(new TaskExecutionState(
+				ejv.getJobId(), attemptID, ExecutionState.FAILED, new Exception("test task failure")));
+	}
+
+	// ------------------------------------------------------------------------
+	//  miscellaneous utilities
+	// ------------------------------------------------------------------------
+
+	private static ExecutionJobVertex getJobVertex(DefaultScheduler scheduler, JobVertexID jobVertexId) {
+		final ExecutionVertexID id = new ExecutionVertexID(jobVertexId, 0);
+		return scheduler.getExecutionVertex(id).getJobVertex();
+	}
+
+	// ------------------------------------------------------------------------
+	//  test mocks
+	// ------------------------------------------------------------------------
+
+	private static final class TestOperatorEvent implements OperatorEvent {}
+
+	private static final class TestException extends Exception {}
+
+	private static final class CoordinatorThatFailsInStart extends TestingOperatorCoordinator {
+
+		public CoordinatorThatFailsInStart(Context context) {
+			super(context);
+		}
+
+		@Override
+		public void start() throws Exception {
+			throw new Exception("test failure");
+		}
+	}
+
+	private static final class FailingTaskExecutorOperatorEventGateway implements TaskExecutorOperatorEventGateway {
+
+		@Override
+		public CompletableFuture<Acknowledge> sendOperatorEventToTask(
+				ExecutionAttemptID task,
+				OperatorID operator,
+				SerializedValue<OperatorEvent> evt) {
+			return FutureUtils.completedExceptionally(new TestException());
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java
new file mode 100644
index 0000000..6eb600b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+/**
+ * A dummy/mock implementation of an {@link OperatorEvent}.
+ */
+public final class TestOperatorEvent implements OperatorEvent {
+	private static final long serialVersionUID = 1L;
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
new file mode 100644
index 0000000..20fcefe
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.util.SerializableFunction;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A simple testing implementation of the {@link OperatorCoordinator}.
+ */
+class TestingOperatorCoordinator implements OperatorCoordinator {
+
+	private final OperatorCoordinator.Context context;
+
+	private final ArrayList<Integer> failedTasks = new ArrayList<>();
+
+	private boolean started;
+	private boolean closed;
+
+	public TestingOperatorCoordinator(OperatorCoordinator.Context context) {
+		this.context = context;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void start() throws Exception {
+		started = true;
+	}
+
+	@Override
+	public void close() {
+		closed = true;
+	}
+
+	@Override
+	public void handleEventFromOperator(int subtask, OperatorEvent event) {}
+
+	@Override
+	public void subtaskFailed(int subtask) {
+		failedTasks.add(subtask);
+	}
+
+	@Override
+	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void checkpointComplete(long checkpointId) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void resetToCheckpoint(byte[] checkpointData) {
+		throw new UnsupportedOperationException();
+	}
+
+	// ------------------------------------------------------------------------
+
+	public OperatorCoordinator.Context getContext() {
+		return context;
+	}
+
+	public boolean isStarted() {
+		return started;
+	}
+
+	public boolean isClosed() {
+		return closed;
+	}
+
+	public Collection<Integer> getFailedTasks() {
+		return failedTasks;
+	}
+
+	// ------------------------------------------------------------------------
+	//  The provider for this coordinator implementation
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A testing stub for an {@link OperatorCoordinator.Provider} that creates a
+	 * {@link TestingOperatorCoordinator}.
+	 */
+	public static final class Provider implements OperatorCoordinator.Provider {
+
+		private static final long serialVersionUID = 1L;
+
+		private final OperatorID operatorId;
+
+		private final SerializableFunction<Context, TestingOperatorCoordinator> factory;
+
+		public Provider(OperatorID operatorId) {
+			this(operatorId, TestingOperatorCoordinator::new);
+		}
+
+		public Provider(OperatorID operatorId, SerializableFunction<Context, TestingOperatorCoordinator> factory) {
+			this.operatorId = operatorId;
+			this.factory = factory;
+		}
+
+		@Override
+		public OperatorID getOperatorId() {
+			return operatorId;
+		}
+
+		@Override
+		public OperatorCoordinator create(OperatorCoordinator.Context context) {
+			return factory.apply(context);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 674caf9..910fb30 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
@@ -44,6 +45,7 @@ import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
 import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
+import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 
@@ -235,4 +237,9 @@ public class DummyEnvironment implements Environment {
 	public void setTaskStateManager(TaskStateManager taskStateManager) {
 		this.taskStateManager = taskStateManager;
 	}
+
+	@Override
+	public TaskOperatorEventGateway getOperatorCoordinatorEventGateway() {
+		return new NoOpTaskOperatorEventGateway();
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 52b00b8..415388c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTe
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.memory.MemoryManagerBuilder;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
@@ -46,6 +47,7 @@ import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
+import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
@@ -337,6 +339,11 @@ public class MockEnvironment implements Environment, AutoCloseable {
 	}
 
 	@Override
+	public TaskOperatorEventGateway getOperatorCoordinatorEventGateway() {
+		return new NoOpTaskOperatorEventGateway();
+	}
+
+	@Override
 	public void failExternally(Throwable cause) {
 		if (!expectedExternalFailureCause.isPresent()) {
 			throw new UnsupportedOperationException("MockEnvironment does not support external task failure.");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 5297fac..7a1f456 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -34,19 +34,26 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
 import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
 import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,13 +81,33 @@ public class SchedulerTestingUtils {
 			JobGraph jobGraph,
 			ManuallyTriggeredScheduledExecutorService asyncExecutor) throws Exception {
 
+		return createScheduler(jobGraph, asyncExecutor, new SimpleAckingTaskManagerGateway());
+	}
+
+	public static DefaultScheduler createScheduler(
+			JobGraph jobGraph,
+			ManuallyTriggeredScheduledExecutorService asyncExecutor,
+			TaskExecutorOperatorEventGateway operatorEventGateway) throws Exception {
+
+		final TaskManagerGateway gateway = operatorEventGateway instanceof TaskManagerGateway
+				? (TaskManagerGateway) operatorEventGateway
+				: new TaskExecutorOperatorEventGatewayAdapter(operatorEventGateway);
+
+		return createScheduler(jobGraph, asyncExecutor, gateway);
+	}
+
+	public static DefaultScheduler createScheduler(
+			JobGraph jobGraph,
+			ManuallyTriggeredScheduledExecutorService asyncExecutor,
+			TaskManagerGateway taskManagerGateway) throws Exception {
+
 		return new DefaultScheduler(
 			LOG,
 			jobGraph,
 			VoidBackPressureStatsTracker.INSTANCE,
 			Executors.directExecutor(),
 			new Configuration(),
-			new SimpleSlotProvider(jobGraph.getJobID(), 0),
+			new SimpleSlotProvider(jobGraph.getJobID(), 0), // this is not used any more in the new scheduler
 			asyncExecutor,
 			asyncExecutor,
 			ClassLoader.getSystemClassLoader(),
@@ -96,7 +123,7 @@ public class SchedulerTestingUtils {
 			new TestRestartBackoffTimeStrategy(true, 0),
 			new DefaultExecutionVertexOperations(),
 			new ExecutionVertexVersioner(),
-			new TestExecutionSlotAllocatorFactory());
+			new TestExecutionSlotAllocatorFactory(taskManagerGateway));
 	}
 
 	public static void enableCheckpointing(final JobGraph jobGraph) {
@@ -167,4 +194,23 @@ public class SchedulerTestingUtils {
 	public static CheckpointCoordinator getCheckpointCoordinator(SchedulerBase scheduler) {
 		return scheduler.getExecutionGraph().getCheckpointCoordinator();
 	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class TaskExecutorOperatorEventGatewayAdapter extends SimpleAckingTaskManagerGateway {
+
+		private final TaskExecutorOperatorEventGateway operatorGateway;
+
+		TaskExecutorOperatorEventGatewayAdapter(TaskExecutorOperatorEventGateway operatorGateway) {
+			this.operatorGateway = operatorGateway;
+		}
+
+		@Override
+		public CompletableFuture<Acknowledge> sendOperatorEventToTask(
+				ExecutionAttemptID task,
+				OperatorID operator,
+				SerializedValue<OperatorEvent> evt) {
+			return operatorGateway.sendOperatorEventToTask(task, operator, evt);
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
index 727bc5f..36aec27 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
@@ -47,6 +48,12 @@ public class TestExecutionSlotAllocator implements ExecutionSlotAllocator, SlotO
 
 	private final List<LogicalSlot> returnedSlots = new ArrayList<>();
 
+	public TestExecutionSlotAllocator() {}
+
+	public TestExecutionSlotAllocator(TaskManagerGateway taskManagerGateway) {
+		logicalSlotBuilder.setTaskManagerGateway(taskManagerGateway);
+	}
+
 	@Override
 	public List<SlotExecutionVertexAssignment> allocateSlotsFor(final List<ExecutionVertexSchedulingRequirements> schedulingRequirementsCollection) {
 		final List<SlotExecutionVertexAssignment> slotVertexAssignments = createSlotVertexAssignments(schedulingRequirementsCollection);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java
index e5d451a..8bd2fa8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java
@@ -19,12 +19,22 @@
 
 package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+
 /**
  * Factory for {@link TestExecutionSlotAllocatorFactory}.
  */
 public class TestExecutionSlotAllocatorFactory implements ExecutionSlotAllocatorFactory {
 
-	private final TestExecutionSlotAllocator testExecutionSlotAllocator = new TestExecutionSlotAllocator();
+	private final TestExecutionSlotAllocator testExecutionSlotAllocator;
+
+	public TestExecutionSlotAllocatorFactory() {
+		this.testExecutionSlotAllocator = new TestExecutionSlotAllocator();
+	}
+
+	public TestExecutionSlotAllocatorFactory(TaskManagerGateway gateway) {
+		this.testExecutionSlotAllocator = new TestExecutionSlotAllocator(gateway);
+	}
 
 	@Override
 	public ExecutionSlotAllocator createInstance(final InputsLocationsRetriever ignored) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java
new file mode 100644
index 0000000..6fb3cb0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.testutils.CancelableInvokable;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.core.testutils.FlinkMatchers.futureWillCompleteExceptionally;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test for the (failure handling of the) delivery of Operator Events.
+ */
+public class TaskExecutorOperatorEventHandlingTest {
+
+	private MetricRegistryImpl metricRegistry;
+
+	private TestingRpcService rpcService;
+
+	@Before
+	public void setup() {
+		rpcService = new TestingRpcService();
+		metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		metricRegistry.startQueryService(rpcService, new ResourceID("mqs"));
+	}
+
+	@After
+	public void teardown() throws ExecutionException, InterruptedException {
+		if (rpcService != null) {
+			rpcService.stopService().get();
+		}
+
+		if (metricRegistry != null) {
+			metricRegistry.shutdown().get();
+		}
+	}
+
+	@Test
+	public void eventHandlingInTaskFailureFailsTask() throws Exception {
+		final JobID jobId = new JobID();
+		final ExecutionAttemptID eid = new ExecutionAttemptID();
+
+		try (TaskSubmissionTestEnvironment env = createExecutorWithRunningTask(jobId, eid, OperatorEventFailingInvokable.class)) {
+			final TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
+			final CompletableFuture<?> resultFuture = tmGateway.sendOperatorEventToTask(eid, new OperatorID(), new SerializedValue<>(null));
+
+			assertThat(resultFuture, futureWillCompleteExceptionally(FlinkException.class, Duration.ofSeconds(10)));
+			assertEquals(ExecutionState.FAILED, env.getTaskSlotTable().getTask(eid).getExecutionState());
+		}
+	}
+
+	@Test
+	public void eventToCoordinatorDeliveryFailureFailsTask() throws Exception {
+		final JobID jobId = new JobID();
+		final ExecutionAttemptID eid = new ExecutionAttemptID();
+
+		try (TaskSubmissionTestEnvironment env = createExecutorWithRunningTask(jobId, eid, OperatorEventSendingInvokable.class)) {
+			final Task task = env.getTaskSlotTable().getTask(eid);
+
+			task.getExecutingThread().join(10_000);
+			assertEquals(ExecutionState.FAILED, task.getExecutionState());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test setup helpers
+	// ------------------------------------------------------------------------
+
+	private TaskSubmissionTestEnvironment createExecutorWithRunningTask(
+			JobID jobId,
+			ExecutionAttemptID executionAttemptId,
+			Class<? extends AbstractInvokable> invokableClass) throws Exception {
+
+		final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
+				jobId, executionAttemptId, invokableClass);
+
+		final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>();
+
+		final JobMasterId token = JobMasterId.generate();
+		final TaskSubmissionTestEnvironment env = new TaskSubmissionTestEnvironment.Builder(jobId)
+				.setJobMasterId(token)
+				.setSlotSize(1)
+				.addTaskManagerActionListener(executionAttemptId, ExecutionState.RUNNING, taskRunningFuture)
+				.setMetricQueryServiceAddress(metricRegistry.getMetricQueryServiceGatewayRpcAddress())
+				.setJobMasterGateway(new TestingJobMasterGatewayBuilder()
+					.setFencingTokenSupplier(() -> token)
+					.setOperatorEventSender((eio, oid, value) -> {
+						throw new RuntimeException();
+					})
+					.build())
+				.build();
+
+		env.getTaskSlotTable().allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
+
+		final TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
+		tmGateway.submitTask(tdd, env.getJobMasterId(), Time.seconds(10)).get();
+		taskRunningFuture.get();
+
+		return env;
+	}
+
+	private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(
+			JobID jobId,
+			ExecutionAttemptID executionAttemptId,
+			Class<? extends AbstractInvokable> invokableClass) throws IOException {
+
+		return TaskExecutorSubmissionTest.createTaskDeploymentDescriptor(
+				jobId,
+				"test job",
+				executionAttemptId,
+				new SerializedValue<>(new ExecutionConfig()),
+				"test task",
+				64,
+				3,
+				17,
+				0,
+				new Configuration(),
+				new Configuration(),
+				invokableClass.getName(),
+				Collections.emptyList(),
+				Collections.emptyList(),
+				Collections.emptyList(),
+				Collections.emptyList(),
+				0);
+	}
+
+	// ------------------------------------------------------------------------
+	//  test mocks
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Test invokable that fails when receiving an operator event.
+	 */
+	public static final class OperatorEventFailingInvokable extends CancelableInvokable {
+
+		public OperatorEventFailingInvokable(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws InterruptedException {
+			waitUntilCancelled();
+		}
+
+		@Override
+		public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event) throws FlinkException {
+			throw new FlinkException("test exception");
+		}
+	}
+
+	/**
+	 * Test invokable that fails when receiving an operator event.
+	 */
+	public static final class OperatorEventSendingInvokable extends CancelableInvokable {
+
+		public OperatorEventSendingInvokable(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			getEnvironment().getOperatorCoordinatorEventGateway()
+				.sendOperatorEventToCoordinator(new OperatorID(), new SerializedValue<>(new TestOperatorEvent()));
+
+			waitUntilCancelled();
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 11b0976..1c447d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -32,15 +32,19 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.TaskBackPressureResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.function.TriConsumer;
+import org.apache.flink.util.function.TriFunction;
 
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -79,6 +83,8 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 
 	private final TriConsumer<JobID, Set<ResultPartitionID>, Set<ResultPartitionID>> releaseOrPromotePartitionsConsumer;
 
+	private final TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> operatorEventHandler;
+
 	TestingTaskExecutorGateway(
 			String address,
 			String hostname,
@@ -91,7 +97,9 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 			Consumer<Exception> disconnectResourceManagerConsumer,
 			Function<ExecutionAttemptID, CompletableFuture<Acknowledge>> cancelTaskFunction,
 			Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier,
-			TriConsumer<JobID, Set<ResultPartitionID>, Set<ResultPartitionID>> releaseOrPromotePartitionsConsumer) {
+			TriConsumer<JobID, Set<ResultPartitionID>, Set<ResultPartitionID>> releaseOrPromotePartitionsConsumer,
+			TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> operatorEventHandler) {
+
 		this.address = Preconditions.checkNotNull(address);
 		this.hostname = Preconditions.checkNotNull(hostname);
 		this.heartbeatJobManagerConsumer = Preconditions.checkNotNull(heartbeatJobManagerConsumer);
@@ -104,6 +112,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 		this.cancelTaskFunction = cancelTaskFunction;
 		this.canBeReleasedSupplier = canBeReleasedSupplier;
 		this.releaseOrPromotePartitionsConsumer = releaseOrPromotePartitionsConsumer;
+		this.operatorEventHandler = operatorEventHandler;
 	}
 
 	@Override
@@ -187,6 +196,14 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 	}
 
 	@Override
+	public CompletableFuture<Acknowledge> sendOperatorEventToTask(
+			ExecutionAttemptID task,
+			OperatorID operator,
+			SerializedValue<OperatorEvent> evt) {
+		return operatorEventHandler.apply(task, operator, evt);
+	}
+
+	@Override
 	public String getAddress() {
 		return address;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
index 80f321d..7e1b0f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
@@ -27,11 +27,15 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.function.TriConsumer;
+import org.apache.flink.util.function.TriFunction;
 
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -55,6 +59,7 @@ public class TestingTaskExecutorGatewayBuilder {
 	private static final Consumer<Exception> NOOP_DISCONNECT_RESOURCE_MANAGER_CONSUMER = ignored -> {};
 	private static final Function<ExecutionAttemptID, CompletableFuture<Acknowledge>> NOOP_CANCEL_TASK_FUNCTION = ignored -> CompletableFuture.completedFuture(Acknowledge.get());
 	private static final TriConsumer<JobID, Set<ResultPartitionID>, Set<ResultPartitionID>> NOOP_RELEASE_PARTITIONS_CONSUMER = (ignoredA, ignoredB, ignoredC) -> {};
+	private static final TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> DEFAULT_OPERATOR_EVENT_HANDLER = (a, b, c) -> CompletableFuture.completedFuture(Acknowledge.get());
 
 	private String address = "foobar:1234";
 	private String hostname = "foobar";
@@ -68,6 +73,7 @@ public class TestingTaskExecutorGatewayBuilder {
 	private Function<ExecutionAttemptID, CompletableFuture<Acknowledge>> cancelTaskFunction = NOOP_CANCEL_TASK_FUNCTION;
 	private Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier = () -> CompletableFuture.completedFuture(true);
 	private TriConsumer<JobID, Set<ResultPartitionID>, Set<ResultPartitionID>> releaseOrPromotePartitionsConsumer = NOOP_RELEASE_PARTITIONS_CONSUMER;
+	private TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> operatorEventHandler = DEFAULT_OPERATOR_EVENT_HANDLER;
 
 	public TestingTaskExecutorGatewayBuilder setAddress(String address) {
 		this.address = address;
@@ -129,6 +135,11 @@ public class TestingTaskExecutorGatewayBuilder {
 		return this;
 	}
 
+	public TestingTaskExecutorGatewayBuilder setOperatorEventHandler(TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> operatorEventHandler) {
+		this.operatorEventHandler = operatorEventHandler;
+		return this;
+	}
+
 	public TestingTaskExecutorGateway createTestingTaskExecutorGateway() {
 		return new TestingTaskExecutorGateway(
 			address,
@@ -142,6 +153,7 @@ public class TestingTaskExecutorGatewayBuilder {
 			disconnectResourceManagerConsumer,
 			cancelTaskFunction,
 			canBeReleasedSupplier,
-			releaseOrPromotePartitionsConsumer);
+			releaseOrPromotePartitionsConsumer,
+			operatorEventHandler);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskOperatorEventGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskOperatorEventGateway.java
new file mode 100644
index 0000000..55d929f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskOperatorEventGateway.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.SerializedValue;
+
+/**
+ * A test / mock implementation of the {@link JobMasterOperatorEventGateway}.
+ */
+public class NoOpTaskOperatorEventGateway implements TaskOperatorEventGateway {
+
+	@Override
+	public void sendOperatorEventToCoordinator(OperatorID operatorID, SerializedValue<OperatorEvent> event) {}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index e048739..33cb0b6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -209,6 +209,7 @@ public class TaskAsyncCallTest extends TestLogger {
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
+			new NoOpTaskOperatorEventGateway(),
 			new TestGlobalAggregateManager(),
 			blobService,
 			libCache,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
index 925b388..2c9a535 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
@@ -213,6 +213,7 @@ public final class TestTaskBuilder {
 			taskManagerActions,
 			new MockInputSplitProvider(),
 			new TestCheckpointResponder(),
+			new NoOpTaskOperatorEventGateway(),
 			new TestGlobalAggregateManager(),
 			blobCacheService,
 			libraryCacheManager,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CancelableInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CancelableInvokable.java
new file mode 100644
index 0000000..089f881
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CancelableInvokable.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * An {@link AbstractInvokable} that blocks at some point until cancelled.
+ *
+ * <p>Subclasses typically call the {@link #waitUntilCancelled()} method somewhere in their
+ * {@link #invoke()} method.
+ */
+public abstract class CancelableInvokable extends AbstractInvokable {
+
+	private volatile boolean canceled;
+
+	protected CancelableInvokable(Environment environment) {
+		super(environment);
+	}
+
+	@Override
+	public void cancel() {
+		canceled = true;
+	}
+
+	protected void waitUntilCancelled() throws InterruptedException {
+		synchronized (this) {
+			while (!canceled) {
+				wait();
+			}
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index c5a5516..c7d2f6a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -214,6 +215,7 @@ public class JvmExitOnFatalErrorTest {
 						new NoOpTaskManagerActions(),
 						new NoOpInputSplitProvider(),
 						new NoOpCheckpointResponder(),
+						new NoOpTaskOperatorEventGateway(),
 						new TestGlobalAggregateManager(),
 						blobService,
 						new BlobLibraryCacheManager(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializableFunction.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializableFunction.java
new file mode 100644
index 0000000..60e9637
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializableFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.Public;
+
+import java.io.Serializable;
+import java.util.function.Function;
+
+/**
+ * A {@link Function} that is also {@link Serializable}.
+ */
+@Public
+@FunctionalInterface
+public interface SerializableFunction<T, R> extends Function<T, R>, Serializable {}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index f273fef..e394069 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -30,9 +30,12 @@ import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput;
 import org.apache.flink.streaming.api.collector.selector.DirectedOutput;
@@ -56,7 +59,9 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.XORShiftRandom;
 
 import org.slf4j.Logger;
@@ -97,6 +102,8 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 
 	private final OP headOperator;
 
+	private final OperatorEventDispatcherImpl operatorEventDispatcher;
+
 	/**
 	 * Current status of the input stream of the operator chain.
 	 * Watermarks explicitly generated by operators in the chain (i.e. timestamp
@@ -109,6 +116,10 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 			StreamTask<OUT, OP> containingTask,
 			RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) {
 
+		this.operatorEventDispatcher = new OperatorEventDispatcherImpl(
+				containingTask.getEnvironment().getUserClassLoader(),
+				containingTask.getEnvironment().getOperatorCoordinatorEventGateway());
+
 		final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
 		final StreamConfig configuration = containingTask.getConfiguration();
 
@@ -195,6 +206,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 		this.streamOutputs = checkNotNull(streamOutputs);
 		this.chainEntryPoint = checkNotNull(chainEntryPoint);
 		this.headOperator = checkNotNull(headOperator);
+		this.operatorEventDispatcher = null;
 	}
 
 	@Override
@@ -202,6 +214,14 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 		return streamStatus;
 	}
 
+	public OperatorEventDispatcher getOperatorEventDispatcher() {
+		return operatorEventDispatcher;
+	}
+
+	public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event) throws FlinkException {
+		operatorEventDispatcher.dispatchEventToHandlers(operator, event);
+	}
+
 	@Override
 	public void toggleStreamStatus(StreamStatus status) {
 		if (!status.equals(this.streamStatus)) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java
new file mode 100644
index 0000000..3407d5b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link OperatorEventDispatcher}.
+ *
+ * <p>This class is intended for single threaded use from the stream task mailbox.
+ */
+@Internal
+final class OperatorEventDispatcherImpl implements OperatorEventDispatcher {
+
+	private final Map<OperatorID, OperatorEventHandler> handlers;
+
+	private final ClassLoader classLoader;
+
+	private final TaskOperatorEventGateway toCoordinator;
+
+	OperatorEventDispatcherImpl(ClassLoader classLoader, TaskOperatorEventGateway toCoordinator) {
+		this.classLoader = checkNotNull(classLoader);
+		this.toCoordinator = checkNotNull(toCoordinator);
+		this.handlers = new HashMap<>();
+	}
+
+	void dispatchEventToHandlers(OperatorID operatorID, SerializedValue<OperatorEvent> serializedEvent) throws FlinkException {
+		final OperatorEvent evt;
+		try {
+			evt = serializedEvent.deserializeValue(classLoader);
+		}
+		catch (IOException | ClassNotFoundException e) {
+			throw new FlinkException("Could not deserialize operator event", e);
+		}
+
+		final OperatorEventHandler handler = handlers.get(operatorID);
+		if (handler != null) {
+			handler.handleOperatorEvent(evt);
+		}
+		else {
+			throw new FlinkException("Operator not registered for operator events");
+		}
+	}
+
+	@Override
+	public OperatorEventGateway registerEventHandler(OperatorID operator, OperatorEventHandler handler) {
+		final OperatorEventGateway gateway = new OperatorEventGatewayImpl(toCoordinator, operator);
+		final OperatorEventHandler prior = handlers.putIfAbsent(operator, handler);
+
+		if (prior == null) {
+			return gateway;
+		}
+		else {
+			throw new IllegalStateException("already a handler registered for this operatorId");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class OperatorEventGatewayImpl implements OperatorEventGateway {
+
+		private final TaskOperatorEventGateway toCoordinator;
+
+		private final OperatorID operatorId;
+
+		private OperatorEventGatewayImpl(TaskOperatorEventGateway toCoordinator, OperatorID operatorId) {
+			this.toCoordinator = toCoordinator;
+			this.operatorId = operatorId;
+		}
+
+		@Override
+		public void sendEventToCoordinator(OperatorEvent event) {
+			final SerializedValue<OperatorEvent> serializedEvent;
+			try {
+				serializedEvent = new SerializedValue<>(event);
+			}
+			catch (IOException e) {
+				// this is not a recoverable situation, so we wrap this in an
+				// unchecked exception and let it bubble up
+				throw new FlinkRuntimeException("Cannot serialize operator event", e);
+			}
+
+			toCoordinator.sendOperatorEventToCoordinator(operatorId, serializedEvent);
+		}
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index b0fb9c5..d633c6f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -44,6 +44,8 @@ import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -75,7 +77,9 @@ import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,6 +96,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -720,6 +725,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		return operatorChain;
 	}
 
+	public OperatorEventDispatcher getOperatorEventDispatcher() {
+		return operatorChain.getOperatorEventDispatcher();
+	}
+
 	RecordWriterOutput<?>[] getStreamOutputs() {
 		return operatorChain.getStreamOutputs();
 	}
@@ -971,6 +980,28 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	}
 
 	// ------------------------------------------------------------------------
+	//  Operator Events
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event) throws FlinkException {
+		try {
+			mailboxProcessor.getMainMailboxExecutor().execute(
+				() -> {
+					try {
+						operatorChain.dispatchOperatorEvent(operator, event);
+					} catch (Throwable t) {
+						mailboxProcessor.reportThrowable(t);
+					}
+				},
+				"dispatch operator event");
+		}
+		catch (RejectedExecutionException e) {
+			// this happens during shutdown, we can swallow this
+		}
+	}
+
+	// ------------------------------------------------------------------------
 	//  State backend
 	// ------------------------------------------------------------------------
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 5da1250..a485011 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -69,6 +69,7 @@ import org.apache.flink.runtime.taskexecutor.KvStateService;
 import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
 import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -278,6 +279,7 @@ public class InterruptSensitiveRestoreTest {
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
+			new NoOpTaskOperatorEventGateway(),
 			new TestGlobalAggregateManager(),
 			blobService,
 			new BlobLibraryCacheManager(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index a31ce09..27a019c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.memory.MemoryManagerBuilder;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
@@ -51,6 +52,7 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
 import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
+import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.Preconditions;
@@ -336,6 +338,11 @@ public class StreamMockEnvironment implements Environment {
 	public void declineCheckpoint(long checkpointId, Throwable cause) {}
 
 	@Override
+	public TaskOperatorEventGateway getOperatorCoordinatorEventGateway() {
+		return new NoOpTaskOperatorEventGateway();
+	}
+
+	@Override
 	public void failExternally(Throwable cause) {
 		if (externalExceptionHandler != null) {
 			externalExceptionHandler.accept(cause);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index e29d58d..06d394c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -71,6 +71,7 @@ import org.apache.flink.runtime.taskexecutor.KvStateService;
 import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
 import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -182,6 +183,7 @@ public class StreamTaskTerminationTest extends TestLogger {
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
+			new NoOpTaskOperatorEventGateway(),
 			new TestGlobalAggregateManager(),
 			blobService,
 			new BlobLibraryCacheManager(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index cd68812..b53a711 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -96,6 +96,7 @@ import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
 import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
+import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
@@ -1254,6 +1255,7 @@ public class StreamTaskTest extends TestLogger {
 			taskManagerActions,
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
+			new NoOpTaskOperatorEventGateway(),
 			new TestGlobalAggregateManager(),
 			blobService,
 			libCache,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index f5f54f3..622ccc1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -58,6 +58,7 @@ import org.apache.flink.runtime.taskexecutor.KvStateService;
 import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
 import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
@@ -260,6 +261,7 @@ public class SynchronousCheckpointITCase {
 				mock(TaskManagerActions.class),
 				mock(InputSplitProvider.class),
 				mock(CheckpointResponder.class),
+				new NoOpTaskOperatorEventGateway(),
 				new TestGlobalAggregateManager(),
 				blobService,
 				libCache,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index 6606e2b..f87e0e4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -75,6 +75,7 @@ import org.apache.flink.runtime.taskexecutor.KvStateService;
 import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
 import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -218,6 +219,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
 				mock(TaskManagerActions.class),
 				mock(InputSplitProvider.class),
 				checkpointResponder,
+				new NoOpTaskOperatorEventGateway(),
 				new TestGlobalAggregateManager(),
 				blobService,
 				new BlobLibraryCacheManager(
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java
new file mode 100644
index 0000000..0303272
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Some reusable hamcrest matchers for Flink.
+ */
+public class FlinkMatchers {
+
+	// ------------------------------------------------------------------------
+	//  factories
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Checks whether {@link CompletableFuture} completed already exceptionally with a specific exception type.
+	 */
+	public static <T, E extends Throwable> FutureFailedMatcher<T> futureFailedWith(Class<E> exceptionType) {
+		Objects.requireNonNull(exceptionType, "exceptionType should not be null");
+		return new FutureFailedMatcher<>(exceptionType);
+	}
+
+	/**
+	 * Checks whether {@link CompletableFuture} will completed exceptionally within a certain time.
+	 */
+	public static <T, E extends Throwable> FutureWillFailMatcher<T> futureWillCompleteExceptionally(
+			Class<E> exceptionType,
+			Duration timeout) {
+		Objects.requireNonNull(exceptionType, "exceptionType should not be null");
+		Objects.requireNonNull(timeout, "timeout should not be null");
+		return new FutureWillFailMatcher<>(exceptionType, timeout);
+	}
+
+	/**
+	 * Checks whether {@link CompletableFuture} will completed exceptionally within a certain time.
+	 */
+	public static <T> FutureWillFailMatcher<T> futureWillCompleteExceptionally(Duration timeout) {
+		return futureWillCompleteExceptionally(Throwable.class, timeout);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** This class should not be instantiated. */
+	private FlinkMatchers() {}
+
+	// ------------------------------------------------------------------------
+	//  matcher implementations
+	// ------------------------------------------------------------------------
+
+	private static final class FutureFailedMatcher<T> extends TypeSafeDiagnosingMatcher<CompletableFuture<T>> {
+
+		private final Class<? extends Throwable> expectedException;
+
+		FutureFailedMatcher(Class<? extends Throwable> expectedException) {
+			super(CompletableFuture.class);
+			this.expectedException = expectedException;
+		}
+
+		@Override
+		protected boolean matchesSafely(CompletableFuture<T> future, Description mismatchDescription) {
+			if (!future.isDone()) {
+				mismatchDescription.appendText("Future is not completed.");
+				return false;
+			}
+
+			if (!future.isCompletedExceptionally()) {
+				Object result = future.getNow(null);
+				assert result != null;
+				mismatchDescription.appendText("Future did not complete exceptionally, but instead regularly with: " + result);
+				return false;
+			}
+
+			try {
+				future.getNow(null);
+				throw new Error();
+			}
+			catch (CompletionException e) {
+				if (e.getCause() != null && expectedException.isAssignableFrom(e.getCause().getClass())) {
+					return true;
+				}
+
+				mismatchDescription.appendText("Future completed with different exception: " + e.getCause());
+				return false;
+			}
+		}
+
+		@Override
+		public void describeTo(Description description) {
+			description.appendText("A CompletableFuture that failed with: " + expectedException.getName());
+		}
+	}
+
+	private static final class FutureWillFailMatcher<T> extends TypeSafeDiagnosingMatcher<CompletableFuture<T>> {
+
+		private final Class<? extends Throwable> expectedException;
+
+		private final Duration timeout;
+
+		FutureWillFailMatcher(Class<? extends Throwable> expectedException, Duration timeout) {
+			super(CompletableFuture.class);
+			this.expectedException = expectedException;
+			this.timeout = timeout;
+		}
+
+		@Override
+		protected boolean matchesSafely(CompletableFuture<T> future, Description mismatchDescription) {
+			try {
+				final Object result = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+				mismatchDescription.appendText("Future did not complete exceptionally, but instead regularly with: " + result);
+				return false;
+			}
+			catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+				throw new Error("interrupted test");
+			}
+			catch (TimeoutException e) {
+				mismatchDescription.appendText("Future did not complete withing " + timeout.toMillis() + " milliseconds.");
+				return false;
+			}
+			catch (ExecutionException e) {
+				if (e.getCause() == null || !expectedException.isAssignableFrom(e.getCause().getClass())) {
+					mismatchDescription.appendText("Future completed with different exception: " + e.getCause());
+					return false;
+				}
+				return true;
+			}
+		}
+
+		@Override
+		public void describeTo(Description description) {
+			description.appendText("A CompletableFuture that will failed within " +
+				timeout.toMillis() + " milliseconds with: " + expectedException.getName());
+		}
+	}
+}