You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/18 18:15:58 UTC
[flink] branch release-1.11 updated: [FLINK-17781][coordination]
Use Scheduler and MainThreadExecutor in OperatorCoordinator.Context.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 3bf6596 [FLINK-17781][coordination] Use Scheduler and MainThreadExecutor in OperatorCoordinator.Context.
3bf6596 is described below
commit 3bf6596b99e0d7760398acdfeccf7184d3ddd900
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon May 18 01:32:49 2020 +0200
[FLINK-17781][coordination] Use Scheduler and MainThreadExecutor in OperatorCoordinator.Context.
This needs (unfortunately) a switch to lazy initialization of the context:
- The Scheduler needs to be created before the OperatorCoordinator Context are created.
One could do that by creating the Coordinators lazily after the Scheduler.
- The Scheduler restores the savepoints as part of the scheduler creation, when the ExecutionGraph
and the CheckpointCoordinator are created early in the constructor.
- That means the OperatorCoordinator needs to exist (or an in placeholder component, here the
OperatorCoordinatorHolder) needs to exist to accept the restored state.
That brings us to a cyclic dependency:
- OperatorCoordinator (context) needs Scheduler and MainThreadExecutor
- Scheduler and MainThreadExecutor need constructed ExecutionGraph
- ExecutionGraph needs CheckpointCoordinator
- CheckpointCoordinator needs OperatorCoordinator
Breaking the Cycle
To break this cyclic dependency, this change introduces a form of lazy initialization:
- We eagerly create the OperatorCoordinators so they exist for state restore
- We provide an uninitialized context to them
. When the Scheduler is started (after leadership is granted) we initialize the
context with the (then readily constructed) Scheduler and MainThreadExecutor
This closes #12225
---
.../runtime/executiongraph/ExecutionGraph.java | 9 +-
.../runtime/executiongraph/ExecutionJobVertex.java | 41 ++--
.../coordination/OperatorCoordinator.java | 10 +
.../coordination/OperatorCoordinatorHolder.java | 236 +++++++++++++++++++++
.../coordination/OperatorCoordinatorUtil.java | 61 ------
.../flink/runtime/scheduler/SchedulerBase.java | 56 +++--
.../flink/runtime/scheduler/SchedulerNG.java | 10 +
.../OperatorCoordinatorSchedulerTest.java | 11 +-
8 files changed, 328 insertions(+), 106 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index e348059..b84a86f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -60,12 +60,11 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.scheduler.InternalFailuresListener;
import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology;
@@ -570,10 +569,10 @@ public class ExecutionGraph implements AccessExecutionGraph {
private Collection<OperatorCoordinatorCheckpointContext> buildOpCoordinatorCheckpointContexts() {
final ArrayList<OperatorCoordinatorCheckpointContext> contexts = new ArrayList<>();
for (final ExecutionJobVertex vertex : verticesInCreationOrder) {
- for (final Map.Entry<OperatorID, OperatorCoordinator> coordinator : vertex.getOperatorCoordinatorMap().entrySet()) {
+ for (final OperatorCoordinatorHolder coordinator : vertex.getOperatorCoordinators()) {
contexts.add(new OperatorCoordinatorCheckpointContext(
- coordinator.getValue(),
- coordinator.getKey(),
+ coordinator,
+ coordinator.getOperatorId(),
vertex.getMaxParallelism(),
vertex.getParallelism()));
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index ee2cd2a..0b5f0ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -44,13 +44,13 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorUtil;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.types.Either;
+import org.apache.flink.util.IOUtils;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
@@ -119,7 +119,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
*/
private Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey = null;
- private final Map<OperatorID, OperatorCoordinator> operatorCoordinators;
+ private final Collection<OperatorCoordinatorHolder> operatorCoordinators;
private InputSplitAssigner splitAssigner;
@@ -229,16 +229,20 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
}
}
- try {
- final Map<OperatorID, OperatorCoordinator> coordinators = OperatorCoordinatorUtil.instantiateCoordinators(
- jobVertex.getOperatorCoordinators(),
- graph.getUserClassLoader(),
- (opId) -> new ExecutionJobVertexCoordinatorContext(opId, this));
-
- this.operatorCoordinators = Collections.unmodifiableMap(coordinators);
- }
- catch (IOException | ClassNotFoundException e) {
- throw new JobException("Cannot instantiate the coordinator for operator " + getName(), e);
+ final List<SerializedValue<OperatorCoordinator.Provider>> coordinatorProviders = getJobVertex().getOperatorCoordinators();
+ if (coordinatorProviders.isEmpty()) {
+ this.operatorCoordinators = Collections.emptyList();
+ } else {
+ final ArrayList<OperatorCoordinatorHolder> coordinators = new ArrayList<>(coordinatorProviders.size());
+ try {
+ for (final SerializedValue<OperatorCoordinator.Provider> provider : coordinatorProviders) {
+ coordinators.add(OperatorCoordinatorHolder.create(provider, this, graph.getUserClassLoader()));
+ }
+ } catch (Exception | LinkageError e) {
+ IOUtils.closeAllQuietly(coordinators);
+ throw new JobException("Cannot instantiate the coordinator for operator " + getName(), e);
+ }
+ this.operatorCoordinators = Collections.unmodifiableList(coordinators);
}
// set up the input splits, if the vertex has any
@@ -371,19 +375,10 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
return getJobVertex().getInputDependencyConstraint();
}
- @Nullable
- public OperatorCoordinator getOperatorCoordinator(OperatorID operatorId) {
- return operatorCoordinators.get(operatorId);
- }
-
- public Map<OperatorID, OperatorCoordinator> getOperatorCoordinatorMap() {
+ public Collection<OperatorCoordinatorHolder> getOperatorCoordinators() {
return operatorCoordinators;
}
- public Collection<OperatorCoordinator> getOperatorCoordinators() {
- return operatorCoordinators.values();
- }
-
public Either<SerializedValue<TaskInformation>, PermanentBlobKey> getTaskInformationOrBlobKey() throws IOException {
// only one thread should offload the task information, so let's also let only one thread
// serialize the task information!
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
index 27e0910..cb388b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
@@ -32,6 +32,16 @@ import java.util.concurrent.CompletableFuture;
*
* <p>Operator coordinators are for example source and sink coordinators that discover and assign
* work, or aggregate and commit metadata.
+ *
+ * <h2>Thread Model</h2>
+ *
+ * <p>All coordinator methods are called by the Job Manager's main thread (mailbox thread). That means that
+ * these methods must not, under any circumstances, perform blocking operations (like I/O or waiting on
+ * locks or futures). That would run a high risk of bringing down the entire JobManager.
+ *
+ * <p>Coordinators that involve more complex operations should hence spawn threads to handle the I/O work.
+ * The methods on the {@link Context} are safe to be called from another thread than the thread that
+ * calls the Coordinator's methods.
*/
public interface OperatorCoordinator extends AutoCloseable {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
new file mode 100644
index 0000000..43c47fe
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TemporaryClassLoaderContext;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A holder for an {@link OperatorCoordinator.Context} and all the necessary facility around it that
+ * is needed to interaction between the Coordinator, the Scheduler, the Checkpoint Coordinator, etc.
+ *
+ * <p>The holder is itself a {@link OperatorCoordinator} and forwards all calls to the actual coordinator.
+ * That way, we can make adjustments to assumptions about the threading model and message/call forwarding
+ * without needing to adjust all the call sites that interact with the coordinator.
+ *
+ * <p>This is also needed, unfortunately, because we need a lazy two-step initialization:
+ * When the execution graph is created, we need to create the coordinators (or the holders, to be specific)
+ * because the CheckpointCoordinator is also created in the ExecutionGraph and needs access to them.
+ * However, the real Coordinators can only be created after SchedulerNG was created, because they need
+ * a reference to it for the failure calls.
+ */
+public class OperatorCoordinatorHolder implements OperatorCoordinator {
+
+ private final OperatorCoordinator coordinator;
+ private final OperatorID operatorId;
+ private final LazyInitializedCoordinatorContext context;
+
+ private OperatorCoordinatorHolder(
+ final OperatorID operatorId,
+ final OperatorCoordinator coordinator,
+ final LazyInitializedCoordinatorContext context) {
+
+ this.operatorId = checkNotNull(operatorId);
+ this.coordinator = checkNotNull(coordinator);
+ this.context = checkNotNull(context);
+ }
+
+ // ------------------------------------------------------------------------
+
+ public OperatorID getOperatorId() {
+ return operatorId;
+ }
+
+ public OperatorCoordinator getCoordinator() {
+ return coordinator;
+ }
+
+ public void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
+ context.lazyInitialize(scheduler, schedulerExecutor);
+ }
+
+ // ------------------------------------------------------------------------
+ // OperatorCoordinator Interface
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void start() throws Exception {
+ checkState(context.isInitialized(), "Coordinator Context is not yet initialized");
+ coordinator.start();
+ }
+
+ @Override
+ public void close() throws Exception {
+ coordinator.close();
+ context.unInitialize();
+ }
+
+ @Override
+ public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
+ coordinator.handleEventFromOperator(subtask, event);
+ }
+
+ @Override
+ public void subtaskFailed(int subtask, @Nullable Throwable reason) {
+ coordinator.subtaskFailed(subtask, reason);
+ }
+
+ @Override
+ public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
+ return coordinator.checkpointCoordinator(checkpointId);
+ }
+
+ @Override
+ public void checkpointComplete(long checkpointId) {
+ coordinator.checkpointComplete(checkpointId);
+ }
+
+ @Override
+ public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+ coordinator.resetToCheckpoint(checkpointData);
+ }
+
+ // ------------------------------------------------------------------------
+ // Factories
+ // ------------------------------------------------------------------------
+
+ public static OperatorCoordinatorHolder create(
+ SerializedValue<OperatorCoordinator.Provider> serializedProvider,
+ ExecutionJobVertex jobVertex,
+ ClassLoader classLoader) throws IOException, ClassNotFoundException {
+
+ try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
+ final OperatorCoordinator.Provider provider = serializedProvider.deserializeValue(classLoader);
+ final OperatorID opId = provider.getOperatorId();
+ final LazyInitializedCoordinatorContext context = new LazyInitializedCoordinatorContext(opId, jobVertex);
+ final OperatorCoordinator coordinator = provider.create(context);
+ return new OperatorCoordinatorHolder(opId, coordinator, context);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Nested Classes
+ // ------------------------------------------------------------------------
+
+ /**
+ * An implementation of the {@link OperatorCoordinator.Context}.
+ *
+ * <p>All methods are safe to be called from other threads than the Scheduler's and the JobMaster's
+ * main threads.
+ *
+ * <p>Implementation note: Ideally, we would like to operate purely against the scheduler
+ * interface, but it is not exposing enough information at the moment.
+ */
+ private static final class LazyInitializedCoordinatorContext implements OperatorCoordinator.Context {
+
+ private final OperatorID operatorId;
+ private final ExecutionJobVertex jobVertex;
+
+ private SchedulerNG scheduler;
+ private Executor schedulerExecutor;
+
+ public LazyInitializedCoordinatorContext(OperatorID operatorId, ExecutionJobVertex jobVertex) {
+ this.operatorId = checkNotNull(operatorId);
+ this.jobVertex = checkNotNull(jobVertex);
+ }
+
+ void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
+ this.scheduler = checkNotNull(scheduler);
+ this.schedulerExecutor = checkNotNull(schedulerExecutor);
+ }
+
+ void unInitialize() {
+ this.scheduler = null;
+ this.schedulerExecutor = null;
+ }
+
+ boolean isInitialized() {
+ return jobVertex != null;
+ }
+
+ private void checkInitialized() {
+ checkState(isInitialized(), "Context was not yet initialized");
+ }
+
+ @Override
+ public OperatorID getOperatorId() {
+ return operatorId;
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> sendEvent(final OperatorEvent evt, final int targetSubtask) {
+ checkInitialized();
+
+ if (targetSubtask < 0 || targetSubtask >= currentParallelism()) {
+ throw new IllegalArgumentException(
+ String.format("subtask index %d out of bounds [0, %d).", targetSubtask, currentParallelism()));
+ }
+
+ final SerializedValue<OperatorEvent> serializedEvent;
+ try {
+ serializedEvent = new SerializedValue<>(evt);
+ }
+ catch (IOException e) {
+ // we do not expect that this exception is handled by the caller, so we make it
+ // unchecked so that it can bubble up
+ throw new FlinkRuntimeException("Cannot serialize operator event", e);
+ }
+
+ final Execution executionAttempt = jobVertex.getTaskVertices()[targetSubtask].getCurrentExecutionAttempt();
+ return executionAttempt.sendOperatorEvent(operatorId, serializedEvent);
+ }
+
+ @Override
+ public void failTask(final int subtask, final Throwable cause) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void failJob(final Throwable cause) {
+ checkInitialized();
+
+ final FlinkException e = new FlinkException("Global failure triggered by OperatorCoordinator for '" +
+ jobVertex.getName() + "' (operator " + operatorId + ").", cause);
+
+ schedulerExecutor.execute(() -> scheduler.handleGlobalFailure(e));
+ }
+
+ @Override
+ public int currentParallelism() {
+ checkInitialized();
+ return jobVertex.getParallelism();
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorUtil.java
deleted file mode 100644
index 154b547..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorUtil.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.coordination;
-
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.util.SerializedValue;
-import org.apache.flink.util.TemporaryClassLoaderContext;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-
-/**
- * A utility to for dealing with the {@link OperatorCoordinator}.
- */
-public final class OperatorCoordinatorUtil {
-
- public static Map<OperatorID, OperatorCoordinator> instantiateCoordinators(
- List<SerializedValue<OperatorCoordinator.Provider>> providers,
- ClassLoader classLoader,
- Function<OperatorID, OperatorCoordinator.Context> contextFactory) throws IOException, ClassNotFoundException {
-
- try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
-
- final HashMap<OperatorID, OperatorCoordinator> coordinators = new HashMap<>();
-
- for (SerializedValue<OperatorCoordinator.Provider> serializedProvider : providers) {
- final OperatorCoordinator.Provider provider = serializedProvider.deserializeValue(classLoader);
- final OperatorID id = provider.getOperatorId();
- final OperatorCoordinator.Context context = contextFactory.apply(id);
- final OperatorCoordinator coordinator = provider.create(context);
- coordinators.put(id, coordinator);
- }
-
- return coordinators;
- }
- }
-
- // ------------------------------------------------------------------------
-
- /** Utility class, not meant to be instantiated. */
- private OperatorCoordinatorUtil() {}
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index c2c911a8..6f2ae47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -81,6 +81,7 @@ import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.runtime.query.KvStateLocation;
@@ -169,7 +170,7 @@ public abstract class SchedulerBase implements SchedulerNG {
protected final ExecutionVertexVersioner executionVertexVersioner;
- private final Map<OperatorID, OperatorCoordinator> coordinatorMap;
+ private final Map<OperatorID, OperatorCoordinatorHolder> coordinatorMap;
private ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
"SchedulerBase is not initialized with proper main thread executor. " +
@@ -443,6 +444,7 @@ public abstract class SchedulerBase implements SchedulerNG {
@Override
public void setMainThreadExecutor(final ComponentMainThreadExecutor mainThreadExecutor) {
this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
+ initializeOperatorCoordinators(mainThreadExecutor);
executionGraph.start(mainThreadExecutor);
}
@@ -901,6 +903,21 @@ public abstract class SchedulerBase implements SchedulerNG {
.orElse("Unknown location");
}
+ // ------------------------------------------------------------------------
+ // Operator Coordinators
+ //
+ // Note: It may be worthwhile to move the OperatorCoordinators out
+ // of the scheduler (have them owned by the JobMaster directly).
+ // Then we could avoid routing these events through the scheduler and
+ // doing this lazy initialization dance. However, this would require
+ // that the Scheduler does not eagerly construct the CheckpointCoordinator
+ // in the ExecutionGraph and does not eagerly restore the savepoint while
+ // doing that. Because during savepoint restore, the OperatorCoordinators
+ // (or at least their holders) already need to exist, to accept the restored
+ // state. But some components they depend on (Scheduler and MainThreadExecutor)
+ // are not fully usable and accessible at that point.
+ // ------------------------------------------------------------------------
+
@Override
public void deliverOperatorEventToCoordinator(
final ExecutionAttemptID taskExecutionId,
@@ -922,8 +939,7 @@ public abstract class SchedulerBase implements SchedulerNG {
throw new TaskNotRunningException("Task is not known or in state running on the JobManager.");
}
- final ExecutionJobVertex ejv = exec.getVertex().getJobVertex();
- final OperatorCoordinator coordinator = ejv.getOperatorCoordinator(operatorId);
+ final OperatorCoordinatorHolder coordinator = coordinatorMap.get(operatorId);
if (coordinator == null) {
throw new FlinkException("No coordinator registered for operator " + operatorId);
}
@@ -932,7 +948,7 @@ public abstract class SchedulerBase implements SchedulerNG {
coordinator.handleEventFromOperator(exec.getParallelSubtaskIndex(), evt);
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
- failJob(t);
+ handleGlobalFailure(t);
}
}
@@ -940,20 +956,30 @@ public abstract class SchedulerBase implements SchedulerNG {
public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
OperatorID operator,
CoordinationRequest request) throws FlinkException {
- OperatorCoordinator coordinator = coordinatorMap.get(operator);
+
+ final OperatorCoordinatorHolder coordinatorHolder = coordinatorMap.get(operator);
+ if (coordinatorHolder == null){
+ throw new FlinkException("Coordinator of operator " + operator + " does not exist");
+ }
+
+ final OperatorCoordinator coordinator = coordinatorHolder.getCoordinator();
if (coordinator instanceof CoordinationRequestHandler) {
return ((CoordinationRequestHandler) coordinator).handleCoordinationRequest(request);
- } else if (coordinator != null) {
- throw new FlinkException("Coordinator of operator " + operator + " cannot handle client event");
} else {
- throw new FlinkException("Coordinator of operator " + operator + " does not exist");
+ throw new FlinkException("Coordinator of operator " + operator + " cannot handle client event");
+ }
+ }
+
+ private void initializeOperatorCoordinators(Executor mainThreadExecutor) {
+ for (OperatorCoordinatorHolder coordinatorHolder : getAllCoordinators()) {
+ coordinatorHolder.lazyInitialize(this, mainThreadExecutor);
}
}
private void startAllOperatorCoordinators() {
- final Collection<OperatorCoordinator> coordinators = getAllCoordinators();
+ final Collection<OperatorCoordinatorHolder> coordinators = getAllCoordinators();
try {
- for (OperatorCoordinator coordinator : coordinators) {
+ for (OperatorCoordinatorHolder coordinator : coordinators) {
coordinator.start();
}
}
@@ -968,15 +994,15 @@ public abstract class SchedulerBase implements SchedulerNG {
getAllCoordinators().forEach(IOUtils::closeQuietly);
}
- private Collection<OperatorCoordinator> getAllCoordinators() {
+ private Collection<OperatorCoordinatorHolder> getAllCoordinators() {
return coordinatorMap.values();
}
- private Map<OperatorID, OperatorCoordinator> createCoordinatorMap() {
- Map<OperatorID, OperatorCoordinator> coordinatorMap = new HashMap<>();
+ private Map<OperatorID, OperatorCoordinatorHolder> createCoordinatorMap() {
+ Map<OperatorID, OperatorCoordinatorHolder> coordinatorMap = new HashMap<>();
for (ExecutionJobVertex vertex : executionGraph.getAllVertices().values()) {
- for (Map.Entry<OperatorID, OperatorCoordinator> entry : vertex.getOperatorCoordinatorMap().entrySet()) {
- coordinatorMap.put(entry.getKey(), entry.getValue());
+ for (OperatorCoordinatorHolder holder : vertex.getOperatorCoordinators()) {
+ coordinatorMap.put(holder.getOperatorId(), holder);
}
}
return coordinatorMap;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index 07f7e7e..1675bc0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -127,6 +127,16 @@ public interface SchedulerNG {
CompletableFuture<String> stopWithSavepoint(String targetDirectory, boolean advanceToEndOfEventTime);
// ------------------------------------------------------------------------
+ // Operator Coordinator related methods
+ //
+ // These are necessary as long as the Operator Coordinators are part of the
+ // scheduler. There are good reasons to pull them out of the Scheduler and
+ // make them directly a part of the JobMaster. However, we would need to
+ // rework the complete CheckpointCoordinator initialization before we can
+ // do that, because the CheckpointCoordinator is initialized (and restores
+ // savepoint) in the scheduler constructor, which requires the coordinators
+ // to be there as well.
+ // ------------------------------------------------------------------------
/**
* Delivers the given OperatorEvent to the {@link OperatorCoordinator} with the given {@link OperatorID}.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 935e0ec..14a136d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -61,6 +61,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
@@ -501,8 +502,14 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
final ExecutionJobVertex vertexWithCoordinator = getJobVertex(scheduler, testVertexId);
assertNotNull("vertex for coordinator not found", vertexWithCoordinator);
- final OperatorCoordinator coordinator = vertexWithCoordinator.getOperatorCoordinator(testOperatorId);
- assertNotNull("vertex does not contain coordinator", coordinator);
+ final Optional<OperatorCoordinatorHolder> coordinatorOptional = vertexWithCoordinator
+ .getOperatorCoordinators()
+ .stream()
+ .filter((holder) -> holder.getOperatorId().equals(testOperatorId))
+ .findFirst();
+ assertTrue("vertex does not contain coordinator", coordinatorOptional.isPresent());
+
+ final OperatorCoordinator coordinator = coordinatorOptional.get().getCoordinator();
assertThat(coordinator, instanceOf(TestingOperatorCoordinator.class));
return (TestingOperatorCoordinator) coordinator;