You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/08/03 14:36:57 UTC
[flink] 06/06: [FLINK-9936][mesos] Wait for leadership before
creating MesosResourceManager components
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5b1500d4fe39a44bfaa03b27216627e3caf55240
Author: gyao <ga...@data-artisans.com>
AuthorDate: Mon Jul 30 10:57:40 2018 +0800
[FLINK-9936][mesos] Wait for leadership before creating MesosResourceManager components
This closes #6464.
---
.../clusterframework/MesosResourceManager.java | 130 +++++++++++++--------
.../clusterframework/MesosResourceManagerTest.java | 21 ++++
.../runtime/resourcemanager/ResourceManager.java | 90 ++++++++++----
3 files changed, 173 insertions(+), 68 deletions(-)
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index e24214d..6fc5322 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -84,6 +84,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import scala.Option;
@@ -138,6 +139,8 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
+ private MesosConfiguration initializedMesosConfig;
+
public MesosResourceManager(
// base class
RpcService rpcService,
@@ -220,9 +223,6 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
// Resource Manager overrides
// ------------------------------------------------------------------------
- /**
- * Starts the Mesos-specifics.
- */
@Override
protected void initialize() throws ResourceManagerException {
// create and start the worker store
@@ -233,9 +233,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
throw new ResourceManagerException("Unable to initialize the worker store.", e);
}
- // register with Mesos
- // TODO : defer connection until RM acquires leadership
-
+ // Prepare to register with Mesos
Protos.FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo()
.clone()
.setCheckpoint(true);
@@ -251,49 +249,86 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
throw new ResourceManagerException("Unable to recover the framework ID.", e);
}
- MesosConfiguration initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
+ initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
+
+ this.selfActor = createSelfActor();
+
+ // configure the artifact server to serve the TM container artifacts
+ try {
+ LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec);
+ }
+ catch (IOException e) {
+ throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e);
+ }
+ }
+
+ @Override
+ protected CompletableFuture<Void> prepareLeadershipAsync() {
+ Preconditions.checkState(initializedMesosConfig != null);
+
schedulerDriver = initializedMesosConfig.createDriver(
new MesosResourceManagerSchedulerCallback(),
false);
// create supporting actors
- selfActor = createSelfActor();
connectionMonitor = createConnectionMonitor();
launchCoordinator = createLaunchCoordinator(schedulerDriver, selfActor);
reconciliationCoordinator = createReconciliationCoordinator(schedulerDriver);
taskMonitor = createTaskMonitor(schedulerDriver);
- // recover state
- try {
- recoverWorkers();
- } catch (Exception e) {
- throw new ResourceManagerException("Unable to recover Mesos worker state.", e);
- }
+ return getWorkersAsync().thenApplyAsync((tasksFromPreviousAttempts) -> {
+ // recover state
+ recoverWorkers(tasksFromPreviousAttempts);
- // configure the artifact server to serve the TM container artifacts
- try {
- LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec);
- }
- catch (IOException e) {
- throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e);
- }
+ // begin scheduling
+ connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor);
+ schedulerDriver.start();
- // begin scheduling
- connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor);
- schedulerDriver.start();
+ LOG.info("Mesos resource manager started.");
+ return null;
+ }, getMainThreadExecutor());
+ }
+
+ @Override
+ protected CompletableFuture<Void> clearStateAsync() {
+ schedulerDriver.stop(true);
+
+ workersInNew.clear();
+ workersInLaunch.clear();
+ workersBeingReturned.clear();
- LOG.info("Mesos resource manager initialized.");
+ return stopSupportingActorsAsync();
}
/**
- * Recover framework/worker information persisted by a prior incarnation of the RM.
+ * Fetches framework/worker information persisted by a prior incarnation of the RM.
*/
- private void recoverWorkers() throws Exception {
+ private CompletableFuture<List<MesosWorkerStore.Worker>> getWorkersAsync() {
// if this resource manager is recovering from failure,
// then some worker tasks are most likely still alive and we can re-obtain them
- final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers();
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers();
+ for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
+ if (worker.state() == MesosWorkerStore.WorkerState.New) {
+ // remove new workers because allocation requests are transient
+ workerStore.removeWorker(worker.taskID());
+ }
+ }
+ return tasksFromPreviousAttempts;
+ } catch (final Exception e) {
+ throw new CompletionException(new ResourceManagerException(e));
+ }
+ }, getRpcService().getExecutor());
+ }
+ /**
+ * Recovers given framework/worker information.
+ *
+ * @see #getWorkersAsync()
+ */
+ private void recoverWorkers(final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts) {
assert(workersInNew.isEmpty());
assert(workersInLaunch.isEmpty());
assert(workersBeingReturned.isEmpty());
@@ -304,15 +339,10 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
List<Tuple2<TaskRequest, String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
- LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), worker.profile());
-
switch(worker.state()) {
- case New:
- // remove new workers because allocation requests are transient
- workerStore.removeWorker(worker.taskID());
- break;
case Launched:
workersInLaunch.put(extractResourceID(worker.taskID()), worker);
+ final LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), worker.profile());
toAssign.add(new Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
break;
case Released:
@@ -329,8 +359,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
}
}
- @Override
- public CompletableFuture<Void> postStop() {
+ private CompletableFuture<Void> stopSupportingActorsAsync() {
FiniteDuration stopTimeout = new FiniteDuration(5L, TimeUnit.SECONDS);
CompletableFuture<Boolean> stopTaskMonitorFuture = stopActor(taskMonitor, stopTimeout);
@@ -345,23 +374,23 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
CompletableFuture<Boolean> stopReconciliationCoordinatorFuture = stopActor(reconciliationCoordinator, stopTimeout);
reconciliationCoordinator = null;
- CompletableFuture<Void> stopFuture = CompletableFuture.allOf(
+ return CompletableFuture.allOf(
stopTaskMonitorFuture,
stopConnectionMonitorFuture,
stopLaunchCoordinatorFuture,
stopReconciliationCoordinatorFuture);
+ }
- final CompletableFuture<Void> terminationFuture = super.postStop();
-
- return stopFuture.thenCombine(
- terminationFuture,
- (Void voidA, Void voidB) -> null);
+ @Override
+ public CompletableFuture<Void> postStop() {
+ return stopSupportingActorsAsync().thenCompose((ignored) -> super.postStop());
}
@Override
protected void internalDeregisterApplication(
ApplicationStatus finalStatus,
@Nullable String diagnostics) throws ResourceManagerException {
+
LOG.info("Shutting down and unregistering as a Mesos framework.");
Exception exception = null;
@@ -627,11 +656,16 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
* Tries to shut down the given actor gracefully.
*
* @param actorRef specifying the actor to shut down
- * @param timeout for the graceful shut down
- * @return Future containing the result of the graceful shut down
+ * @param timeout for the graceful shut down
+ * @return A future that finishes with {@code true} iff. the actor could be stopped gracefully
+ * or {@code actorRef} was {@code null}.
*/
- private CompletableFuture<Boolean> stopActor(final ActorRef actorRef, FiniteDuration timeout) {
- return FutureUtils.toJava(Patterns.gracefulStop(actorRef, timeout))
+ private CompletableFuture<Boolean> stopActor(@Nullable final ActorRef actorRef, FiniteDuration timeout) {
+ if (actorRef == null) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ return FutureUtils.<Boolean>toJava(Patterns.gracefulStop(actorRef, timeout))
.exceptionally(
(Throwable throwable) -> {
// The actor did not stop gracefully in time, try to directly stop it
@@ -639,7 +673,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
log.warn("Could not stop actor {} gracefully.", actorRef.path(), throwable);
- return true;
+ return false;
}
);
}
@@ -794,7 +828,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
@Override
public void disconnected(SchedulerDriver driver) {
- runAsync(new Runnable() {
+ runAsyncWithoutFencing(new Runnable() {
@Override
public void run() {
MesosResourceManager.this.disconnected(new Disconnected());
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 5af3fa0..171e408 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -505,6 +505,7 @@ public class MesosResourceManagerTest extends TestLogger {
@Override
public void close() throws Exception {
rpcService.stopService().get();
+ fatalErrorHandler.rethrowError();
}
}
@@ -807,4 +808,24 @@ public class MesosResourceManagerTest extends TestLogger {
resourceManager.taskRouter.expectMsgClass(Disconnected.class);
}};
}
+
+ @Test
+ public void testClearStateAfterRevokeLeadership() throws Exception {
+ new Context() {{
+ final MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1);
+ final MesosWorkerStore.Worker worker2 = MesosWorkerStore.Worker.newWorker(task2).launchWorker(slave1, slave1host);
+ final MesosWorkerStore.Worker worker3 = MesosWorkerStore.Worker.newWorker(task3).launchWorker(slave1, slave1host).releaseWorker();
+ when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
+ when(rmServices.workerStore.recoverWorkers()).thenReturn(Arrays.asList(worker1, worker2, worker3)).thenReturn(Collections.emptyList());
+
+ startResourceManager();
+ rmServices.rmLeaderElectionService.notLeader();
+ rmServices.grantLeadership();
+
+ assertThat(resourceManager.workersInNew.size(), equalTo(0));
+ assertThat(resourceManager.workersInLaunch.size(), equalTo(0));
+ assertThat(resourceManager.workersBeingReturned.size(), equalTo(0));
+ verify(rmServices.schedulerDriver).stop(true);
+ }};
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index a992632..7a54224 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -139,6 +139,14 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
/** All registered listeners for status updates of the ResourceManager. */
private ConcurrentMap<String, InfoMessageListenerRpcGateway> infoMessageListeners;
+ /**
+ * Represents asynchronous state clearing work.
+ *
+ * @see #clearStateAsync()
+ * @see #clearStateInternal()
+ */
+ private CompletableFuture<Void> clearStateFuture = CompletableFuture.completedFuture(null);
+
public ResourceManager(
RpcService rpcService,
String resourceManagerEndpointId,
@@ -192,6 +200,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
+ initialize();
+
try {
leaderElectionService.start(this);
} catch (Exception e) {
@@ -203,8 +213,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
} catch (Exception e) {
throw new ResourceManagerException("Could not start the job leader id service.", e);
}
-
- initialize();
}
@Override
@@ -233,7 +241,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
- clearState();
+ clearStateInternal();
if (exception != null) {
return FutureUtils.completedExceptionally(
@@ -724,7 +732,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
}
}
- private void clearState() {
+ private void clearStateInternal() {
jobManagerRegistrations.clear();
jmResourceIdRegistrations.clear();
taskExecutors.clear();
@@ -734,6 +742,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
} catch (Exception e) {
onFatalError(new ResourceManagerException("Could not properly clear the job leader id service.", e));
}
+ clearStateFuture = clearStateAsync();
}
/**
@@ -886,26 +895,45 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
*/
@Override
public void grantLeadership(final UUID newLeaderSessionID) {
- runAsyncWithoutFencing(
- () -> {
- final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID);
-
- log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId);
+ final CompletableFuture<Boolean> acceptLeadershipFuture = clearStateFuture
+ .thenComposeAsync((ignored) -> tryAcceptLeadership(newLeaderSessionID), getUnfencedMainThreadExecutor());
+
+ final CompletableFuture<Void> confirmationFuture = acceptLeadershipFuture.thenAcceptAsync(
+ (acceptLeadership) -> {
+ if (acceptLeadership) {
+ // confirming the leader session ID might be blocking,
+ leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+ }
+ },
+ getRpcService().getExecutor());
- // clear the state if we've been the leader before
- if (getFencingToken() != null) {
- clearState();
+ confirmationFuture.whenComplete(
+ (Void ignored, Throwable throwable) -> {
+ if (throwable != null) {
+ onFatalError(ExceptionUtils.stripCompletionException(throwable));
}
+ });
+ }
- setFencingToken(newResourceManagerId);
+ private CompletableFuture<Boolean> tryAcceptLeadership(final UUID newLeaderSessionID) {
+ if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
+ final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID);
- slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
+ log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId);
- getRpcService().execute(
- () ->
- // confirming the leader session ID might be blocking,
- leaderElectionService.confirmLeaderSessionID(newLeaderSessionID));
- });
+ // clear the state if we've been the leader before
+ if (getFencingToken() != null) {
+ clearStateInternal();
+ }
+
+ setFencingToken(newResourceManagerId);
+
+ slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
+
+ return prepareLeadershipAsync().thenApply(ignored -> true);
+ } else {
+ return CompletableFuture.completedFuture(false);
+ }
}
/**
@@ -917,7 +945,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
() -> {
log.info("ResourceManager {} was revoked leadership. Clearing fencing token.", getAddress());
- clearState();
+ clearStateInternal();
setFencingToken(null);
@@ -947,6 +975,28 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
protected abstract void initialize() throws ResourceManagerException;
/**
+ * This method can be overridden to add a (non-blocking) initialization routine to the
+ * ResourceManager that will be called when leadership is granted but before leadership is
+ * confirmed.
+ *
+ * @return Returns a {@code CompletableFuture} that completes when the computation is finished.
+ */
+ protected CompletableFuture<Void> prepareLeadershipAsync() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ /**
+ * This method can be overridden to add a (non-blocking) state clearing routine to the
+ * ResourceManager that will be called when leadership is revoked.
+ *
+ * @return Returns a {@code CompletableFuture} that completes when the state clearing routine
+ * is finished.
+ */
+ protected CompletableFuture<Void> clearStateAsync() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ /**
* The framework specific code to deregister the application. This should report the
* application's final status and shut down the resource manager cleanly.
*