You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/18 18:15:58 UTC

[flink] branch release-1.11 updated: [FLINK-17781][coordination] Use Scheduler and MainThreadExecutor in OperatorCoordinator.Context.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 3bf6596  [FLINK-17781][coordination] Use Scheduler and MainThreadExecutor in OperatorCoordinator.Context.
3bf6596 is described below

commit 3bf6596b99e0d7760398acdfeccf7184d3ddd900
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon May 18 01:32:49 2020 +0200

    [FLINK-17781][coordination] Use Scheduler and MainThreadExecutor in OperatorCoordinator.Context.
    
    This needs (unfortunately) a switch to lazy initialization of the context:
    
      - The Scheduler needs to be created before the OperatorCoordinator Context are created.
        One could do that by creating the Coordinators lazily after the Scheduler.
    
      - The Scheduler restores the savepoints as part of the scheduler creation, when the ExecutionGraph
        and the CheckpointCoordinator are created early in the constructor.
    
      - That means the OperatorCoordinator needs to exist (or an in placeholder component, here the
        OperatorCoordinatorHolder) needs to exist to accept the restored state.
    
    That brings us to a cyclic dependency:
    
      - OperatorCoordinator (context) needs Scheduler and MainThreadExecutor
      - Scheduler and MainThreadExecutor need constructed ExecutionGraph
      - ExecutionGraph needs CheckpointCoordinator
      - CheckpointCoordinator needs OperatorCoordinator
    
    Breaking the Cycle
    
    To break this cyclic dependency, this change introduces a form of lazy initialization:
      - We eagerly create the OperatorCoordinators so they exist for state restore
      - We provide an uninitialized context to them
      . When the Scheduler is started (after leadership is granted) we initialize the
       context with the (then readily constructed) Scheduler and MainThreadExecutor
    
    This closes #12225
---
 .../runtime/executiongraph/ExecutionGraph.java     |   9 +-
 .../runtime/executiongraph/ExecutionJobVertex.java |  41 ++--
 .../coordination/OperatorCoordinator.java          |  10 +
 .../coordination/OperatorCoordinatorHolder.java    | 236 +++++++++++++++++++++
 .../coordination/OperatorCoordinatorUtil.java      |  61 ------
 .../flink/runtime/scheduler/SchedulerBase.java     |  56 +++--
 .../flink/runtime/scheduler/SchedulerNG.java       |  10 +
 .../OperatorCoordinatorSchedulerTest.java          |  11 +-
 8 files changed, 328 insertions(+), 106 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index e348059..b84a86f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -60,12 +60,11 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.scheduler.InternalFailuresListener;
 import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology;
@@ -570,10 +569,10 @@ public class ExecutionGraph implements AccessExecutionGraph {
 	private Collection<OperatorCoordinatorCheckpointContext> buildOpCoordinatorCheckpointContexts() {
 		final ArrayList<OperatorCoordinatorCheckpointContext> contexts = new ArrayList<>();
 		for (final ExecutionJobVertex vertex : verticesInCreationOrder) {
-			for (final Map.Entry<OperatorID, OperatorCoordinator> coordinator : vertex.getOperatorCoordinatorMap().entrySet()) {
+			for (final OperatorCoordinatorHolder coordinator : vertex.getOperatorCoordinators()) {
 				contexts.add(new OperatorCoordinatorCheckpointContext(
-						coordinator.getValue(),
-						coordinator.getKey(),
+						coordinator,
+						coordinator.getOperatorId(),
 						vertex.getMaxParallelism(),
 						vertex.getParallelism()));
 			}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index ee2cd2a..0b5f0ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -44,13 +44,13 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorUtil;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.types.Either;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
@@ -119,7 +119,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	 */
 	private Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey = null;
 
-	private final Map<OperatorID, OperatorCoordinator> operatorCoordinators;
+	private final Collection<OperatorCoordinatorHolder> operatorCoordinators;
 
 	private InputSplitAssigner splitAssigner;
 
@@ -229,16 +229,20 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 			}
 		}
 
-		try {
-			final Map<OperatorID, OperatorCoordinator> coordinators = OperatorCoordinatorUtil.instantiateCoordinators(
-					jobVertex.getOperatorCoordinators(),
-					graph.getUserClassLoader(),
-					(opId) -> new ExecutionJobVertexCoordinatorContext(opId, this));
-
-			this.operatorCoordinators = Collections.unmodifiableMap(coordinators);
-		}
-		catch (IOException | ClassNotFoundException e) {
-			throw new JobException("Cannot instantiate the coordinator for operator " + getName(), e);
+		final List<SerializedValue<OperatorCoordinator.Provider>> coordinatorProviders = getJobVertex().getOperatorCoordinators();
+		if (coordinatorProviders.isEmpty()) {
+			this.operatorCoordinators = Collections.emptyList();
+		} else {
+			final ArrayList<OperatorCoordinatorHolder> coordinators = new ArrayList<>(coordinatorProviders.size());
+			try {
+				for (final SerializedValue<OperatorCoordinator.Provider> provider : coordinatorProviders) {
+					coordinators.add(OperatorCoordinatorHolder.create(provider, this, graph.getUserClassLoader()));
+				}
+			} catch (Exception | LinkageError e) {
+				IOUtils.closeAllQuietly(coordinators);
+				throw new JobException("Cannot instantiate the coordinator for operator " + getName(), e);
+			}
+			this.operatorCoordinators = Collections.unmodifiableList(coordinators);
 		}
 
 		// set up the input splits, if the vertex has any
@@ -371,19 +375,10 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		return getJobVertex().getInputDependencyConstraint();
 	}
 
-	@Nullable
-	public OperatorCoordinator getOperatorCoordinator(OperatorID operatorId) {
-		return operatorCoordinators.get(operatorId);
-	}
-
-	public Map<OperatorID, OperatorCoordinator> getOperatorCoordinatorMap() {
+	public Collection<OperatorCoordinatorHolder> getOperatorCoordinators() {
 		return operatorCoordinators;
 	}
 
-	public Collection<OperatorCoordinator> getOperatorCoordinators() {
-		return operatorCoordinators.values();
-	}
-
 	public Either<SerializedValue<TaskInformation>, PermanentBlobKey> getTaskInformationOrBlobKey() throws IOException {
 		// only one thread should offload the task information, so let's also let only one thread
 		// serialize the task information!
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
index 27e0910..cb388b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
@@ -32,6 +32,16 @@ import java.util.concurrent.CompletableFuture;
  *
  * <p>Operator coordinators are for example source and sink coordinators that discover and assign
  * work, or aggregate and commit metadata.
+ *
+ * <h2>Thread Model</h2>
+ *
+ * <p>All coordinator methods are called by the Job Manager's main thread (mailbox thread). That means that
+ * these methods must not, under any circumstances, perform blocking operations (like I/O or waiting on
+ * locks or futures). That would run a high risk of bringing down the entire JobManager.
+ *
+ * <p>Coordinators that involve more complex operations should hence spawn threads to handle the I/O work.
+ * The methods on the {@link Context} are safe to be called from another thread than the thread that
+ * calls the Coordinator's methods.
  */
 public interface OperatorCoordinator extends AutoCloseable {
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
new file mode 100644
index 0000000..43c47fe
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TemporaryClassLoaderContext;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A holder for an {@link OperatorCoordinator.Context} and all the necessary facility around it that
+ * is needed to interaction between the Coordinator, the Scheduler, the Checkpoint Coordinator, etc.
+ *
+ * <p>The holder is itself a {@link OperatorCoordinator} and forwards all calls to the actual coordinator.
+ * That way, we can make adjustments to assumptions about the threading model and message/call forwarding
+ * without needing to adjust all the call sites that interact with the coordinator.
+ *
+ * <p>This is also needed, unfortunately, because we need a lazy two-step initialization:
+ * When the execution graph is created, we need to create the coordinators (or the holders, to be specific)
+ * because the CheckpointCoordinator is also created in the ExecutionGraph and needs access to them.
+ * However, the real Coordinators can only be created after SchedulerNG was created, because they need
+ * a reference to it for the failure calls.
+ */
+public class OperatorCoordinatorHolder implements OperatorCoordinator {
+
+	private final OperatorCoordinator coordinator;
+	private final OperatorID operatorId;
+	private final LazyInitializedCoordinatorContext context;
+
+	private OperatorCoordinatorHolder(
+			final OperatorID operatorId,
+			final OperatorCoordinator coordinator,
+			final LazyInitializedCoordinatorContext context) {
+
+		this.operatorId = checkNotNull(operatorId);
+		this.coordinator = checkNotNull(coordinator);
+		this.context = checkNotNull(context);
+	}
+
+	// ------------------------------------------------------------------------
+
+	public OperatorID getOperatorId() {
+		return operatorId;
+	}
+
+	public OperatorCoordinator getCoordinator() {
+		return coordinator;
+	}
+
+	public void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
+		context.lazyInitialize(scheduler, schedulerExecutor);
+	}
+
+	// ------------------------------------------------------------------------
+	//  OperatorCoordinator Interface
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void start() throws Exception {
+		checkState(context.isInitialized(), "Coordinator Context is not yet initialized");
+		coordinator.start();
+	}
+
+	@Override
+	public void close() throws Exception {
+		coordinator.close();
+		context.unInitialize();
+	}
+
+	@Override
+	public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
+		coordinator.handleEventFromOperator(subtask, event);
+	}
+
+	@Override
+	public void subtaskFailed(int subtask, @Nullable Throwable reason) {
+		coordinator.subtaskFailed(subtask, reason);
+	}
+
+	@Override
+	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
+		return coordinator.checkpointCoordinator(checkpointId);
+	}
+
+	@Override
+	public void checkpointComplete(long checkpointId) {
+		coordinator.checkpointComplete(checkpointId);
+	}
+
+	@Override
+	public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+		coordinator.resetToCheckpoint(checkpointData);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Factories
+	// ------------------------------------------------------------------------
+
+	public static OperatorCoordinatorHolder create(
+			SerializedValue<OperatorCoordinator.Provider> serializedProvider,
+			ExecutionJobVertex jobVertex,
+			ClassLoader classLoader) throws IOException, ClassNotFoundException {
+
+		try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
+			final OperatorCoordinator.Provider provider = serializedProvider.deserializeValue(classLoader);
+			final OperatorID opId = provider.getOperatorId();
+			final LazyInitializedCoordinatorContext context = new LazyInitializedCoordinatorContext(opId, jobVertex);
+			final OperatorCoordinator coordinator = provider.create(context);
+			return new OperatorCoordinatorHolder(opId, coordinator, context);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Nested Classes
+	// ------------------------------------------------------------------------
+
+	/**
+	 * An implementation of the {@link OperatorCoordinator.Context}.
+	 *
+	 * <p>All methods are safe to be called from other threads than the Scheduler's and the JobMaster's
+	 * main threads.
+	 *
+	 * <p>Implementation note: Ideally, we would like to operate purely against the scheduler
+	 * interface, but it is not exposing enough information at the moment.
+	 */
+	private static final class LazyInitializedCoordinatorContext implements OperatorCoordinator.Context {
+
+		private final OperatorID operatorId;
+		private final ExecutionJobVertex jobVertex;
+
+		private SchedulerNG scheduler;
+		private Executor schedulerExecutor;
+
+		public LazyInitializedCoordinatorContext(OperatorID operatorId, ExecutionJobVertex jobVertex) {
+			this.operatorId = checkNotNull(operatorId);
+			this.jobVertex = checkNotNull(jobVertex);
+		}
+
+		void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
+			this.scheduler = checkNotNull(scheduler);
+			this.schedulerExecutor = checkNotNull(schedulerExecutor);
+		}
+
+		void unInitialize() {
+			this.scheduler = null;
+			this.schedulerExecutor = null;
+		}
+
+		boolean isInitialized() {
+			return jobVertex != null;
+		}
+
+		private void checkInitialized() {
+			checkState(isInitialized(), "Context was not yet initialized");
+		}
+
+		@Override
+		public OperatorID getOperatorId() {
+			return operatorId;
+		}
+
+		@Override
+		public CompletableFuture<Acknowledge> sendEvent(final OperatorEvent evt, final int targetSubtask) {
+			checkInitialized();
+
+			if (targetSubtask < 0 || targetSubtask >= currentParallelism()) {
+				throw new IllegalArgumentException(
+					String.format("subtask index %d out of bounds [0, %d).", targetSubtask, currentParallelism()));
+			}
+
+			final SerializedValue<OperatorEvent> serializedEvent;
+			try {
+				serializedEvent = new SerializedValue<>(evt);
+			}
+			catch (IOException e) {
+				// we do not expect that this exception is handled by the caller, so we make it
+				// unchecked so that it can bubble up
+				throw new FlinkRuntimeException("Cannot serialize operator event", e);
+			}
+
+			final Execution executionAttempt = jobVertex.getTaskVertices()[targetSubtask].getCurrentExecutionAttempt();
+			return executionAttempt.sendOperatorEvent(operatorId, serializedEvent);
+		}
+
+		@Override
+		public void failTask(final int subtask, final Throwable cause) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void failJob(final Throwable cause) {
+			checkInitialized();
+
+			final FlinkException e = new FlinkException("Global failure triggered by OperatorCoordinator for '" +
+				jobVertex.getName() + "' (operator " + operatorId + ").", cause);
+
+			schedulerExecutor.execute(() -> scheduler.handleGlobalFailure(e));
+		}
+
+		@Override
+		public int currentParallelism() {
+			checkInitialized();
+			return jobVertex.getParallelism();
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorUtil.java
deleted file mode 100644
index 154b547..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorUtil.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.coordination;
-
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.util.SerializedValue;
-import org.apache.flink.util.TemporaryClassLoaderContext;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-
-/**
- * A utility to for dealing with the {@link OperatorCoordinator}.
- */
-public final class OperatorCoordinatorUtil {
-
-	public static Map<OperatorID, OperatorCoordinator> instantiateCoordinators(
-			List<SerializedValue<OperatorCoordinator.Provider>> providers,
-			ClassLoader classLoader,
-			Function<OperatorID, OperatorCoordinator.Context> contextFactory) throws IOException, ClassNotFoundException {
-
-		try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
-
-			final HashMap<OperatorID, OperatorCoordinator> coordinators = new HashMap<>();
-
-			for (SerializedValue<OperatorCoordinator.Provider> serializedProvider : providers) {
-				final OperatorCoordinator.Provider provider = serializedProvider.deserializeValue(classLoader);
-				final OperatorID id = provider.getOperatorId();
-				final OperatorCoordinator.Context context = contextFactory.apply(id);
-				final OperatorCoordinator coordinator = provider.create(context);
-				coordinators.put(id, coordinator);
-			}
-
-			return coordinators;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/** Utility class, not meant to be instantiated. */
-	private OperatorCoordinatorUtil() {}
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index c2c911a8..6f2ae47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -81,6 +81,7 @@ import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.apache.flink.runtime.query.KvStateLocation;
@@ -169,7 +170,7 @@ public abstract class SchedulerBase implements SchedulerNG {
 
 	protected final ExecutionVertexVersioner executionVertexVersioner;
 
-	private final Map<OperatorID, OperatorCoordinator> coordinatorMap;
+	private final Map<OperatorID, OperatorCoordinatorHolder> coordinatorMap;
 
 	private ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
 		"SchedulerBase is not initialized with proper main thread executor. " +
@@ -443,6 +444,7 @@ public abstract class SchedulerBase implements SchedulerNG {
 	@Override
 	public void setMainThreadExecutor(final ComponentMainThreadExecutor mainThreadExecutor) {
 		this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
+		initializeOperatorCoordinators(mainThreadExecutor);
 		executionGraph.start(mainThreadExecutor);
 	}
 
@@ -901,6 +903,21 @@ public abstract class SchedulerBase implements SchedulerNG {
 			.orElse("Unknown location");
 	}
 
+	// ------------------------------------------------------------------------
+	//  Operator Coordinators
+	//
+	//  Note: It may be worthwhile to move the OperatorCoordinators out
+	//        of the scheduler (have them owned by the JobMaster directly).
+	//        Then we could avoid routing these events through the scheduler and
+	//        doing this lazy initialization dance. However, this would require
+	//        that the Scheduler does not eagerly construct the CheckpointCoordinator
+	//        in the ExecutionGraph and does not eagerly restore the savepoint while
+	//        doing that. Because during savepoint restore, the OperatorCoordinators
+	//        (or at least their holders) already need to exist, to accept the restored
+	//        state. But some components they depend on (Scheduler and MainThreadExecutor)
+	//        are not fully usable and accessible at that point.
+	// ------------------------------------------------------------------------
+
 	@Override
 	public void deliverOperatorEventToCoordinator(
 			final ExecutionAttemptID taskExecutionId,
@@ -922,8 +939,7 @@ public abstract class SchedulerBase implements SchedulerNG {
 			throw new TaskNotRunningException("Task is not known or in state running on the JobManager.");
 		}
 
-		final ExecutionJobVertex ejv = exec.getVertex().getJobVertex();
-		final OperatorCoordinator coordinator = ejv.getOperatorCoordinator(operatorId);
+		final OperatorCoordinatorHolder coordinator = coordinatorMap.get(operatorId);
 		if (coordinator == null) {
 			throw new FlinkException("No coordinator registered for operator " + operatorId);
 		}
@@ -932,7 +948,7 @@ public abstract class SchedulerBase implements SchedulerNG {
 			coordinator.handleEventFromOperator(exec.getParallelSubtaskIndex(), evt);
 		} catch (Throwable t) {
 			ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
-			failJob(t);
+			handleGlobalFailure(t);
 		}
 	}
 
@@ -940,20 +956,30 @@ public abstract class SchedulerBase implements SchedulerNG {
 	public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
 			OperatorID operator,
 			CoordinationRequest request) throws FlinkException {
-		OperatorCoordinator coordinator = coordinatorMap.get(operator);
+
+		final OperatorCoordinatorHolder coordinatorHolder = coordinatorMap.get(operator);
+		if (coordinatorHolder == null){
+			throw new FlinkException("Coordinator of operator " + operator + " does not exist");
+		}
+
+		final OperatorCoordinator coordinator = coordinatorHolder.getCoordinator();
 		if (coordinator instanceof CoordinationRequestHandler) {
 			return ((CoordinationRequestHandler) coordinator).handleCoordinationRequest(request);
-		} else if (coordinator != null) {
-			throw new FlinkException("Coordinator of operator " + operator + " cannot handle client event");
 		} else {
-			throw new FlinkException("Coordinator of operator " + operator + " does not exist");
+			throw new FlinkException("Coordinator of operator " + operator + " cannot handle client event");
+		}
+	}
+
+	private void initializeOperatorCoordinators(Executor mainThreadExecutor) {
+		for (OperatorCoordinatorHolder coordinatorHolder : getAllCoordinators()) {
+			coordinatorHolder.lazyInitialize(this, mainThreadExecutor);
 		}
 	}
 
 	private void startAllOperatorCoordinators() {
-		final Collection<OperatorCoordinator> coordinators = getAllCoordinators();
+		final Collection<OperatorCoordinatorHolder> coordinators = getAllCoordinators();
 		try {
-			for (OperatorCoordinator coordinator : coordinators) {
+			for (OperatorCoordinatorHolder coordinator : coordinators) {
 				coordinator.start();
 			}
 		}
@@ -968,15 +994,15 @@ public abstract class SchedulerBase implements SchedulerNG {
 		getAllCoordinators().forEach(IOUtils::closeQuietly);
 	}
 
-	private Collection<OperatorCoordinator> getAllCoordinators() {
+	private Collection<OperatorCoordinatorHolder> getAllCoordinators() {
 		return coordinatorMap.values();
 	}
 
-	private Map<OperatorID, OperatorCoordinator> createCoordinatorMap() {
-		Map<OperatorID, OperatorCoordinator> coordinatorMap = new HashMap<>();
+	private Map<OperatorID, OperatorCoordinatorHolder> createCoordinatorMap() {
+		Map<OperatorID, OperatorCoordinatorHolder> coordinatorMap = new HashMap<>();
 		for (ExecutionJobVertex vertex : executionGraph.getAllVertices().values()) {
-			for (Map.Entry<OperatorID, OperatorCoordinator> entry : vertex.getOperatorCoordinatorMap().entrySet()) {
-				coordinatorMap.put(entry.getKey(), entry.getValue());
+			for (OperatorCoordinatorHolder holder : vertex.getOperatorCoordinators()) {
+				coordinatorMap.put(holder.getOperatorId(), holder);
 			}
 		}
 		return coordinatorMap;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index 07f7e7e..1675bc0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -127,6 +127,16 @@ public interface SchedulerNG {
 	CompletableFuture<String> stopWithSavepoint(String targetDirectory, boolean advanceToEndOfEventTime);
 
 	// ------------------------------------------------------------------------
+	//  Operator Coordinator related methods
+	//
+	//  These are necessary as long as the Operator Coordinators are part of the
+	//  scheduler. There are good reasons to pull them out of the Scheduler and
+	//  make them directly a part of the JobMaster. However, we would need to
+	//  rework the complete CheckpointCoordinator initialization before we can
+	//  do that, because the CheckpointCoordinator is initialized (and restores
+	//  savepoint) in the scheduler constructor, which requires the coordinators
+	//  to be there as well.
+	// ------------------------------------------------------------------------
 
 	/**
 	 * Delivers the given OperatorEvent to the {@link OperatorCoordinator} with the given {@link OperatorID}.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 935e0ec..14a136d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -61,6 +61,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
@@ -501,8 +502,14 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		final ExecutionJobVertex vertexWithCoordinator = getJobVertex(scheduler, testVertexId);
 		assertNotNull("vertex for coordinator not found", vertexWithCoordinator);
 
-		final OperatorCoordinator coordinator = vertexWithCoordinator.getOperatorCoordinator(testOperatorId);
-		assertNotNull("vertex does not contain coordinator", coordinator);
+		final Optional<OperatorCoordinatorHolder> coordinatorOptional = vertexWithCoordinator
+				.getOperatorCoordinators()
+				.stream()
+				.filter((holder) -> holder.getOperatorId().equals(testOperatorId))
+				.findFirst();
+		assertTrue("vertex does not contain coordinator", coordinatorOptional.isPresent());
+
+		final OperatorCoordinator coordinator = coordinatorOptional.get().getCoordinator();
 		assertThat(coordinator, instanceOf(TestingOperatorCoordinator.class));
 
 		return (TestingOperatorCoordinator) coordinator;