You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/09 02:20:28 UTC

[GitHub] [flink] xintongsong commented on a change in pull request #13313: [FLINK-18722][mesos] Migrate MesosResourceManager to the new MesosResourceManagerDriver

xintongsong commented on a change in pull request #13313:
URL: https://github.com/apache/flink/pull/13313#discussion_r502139646



##########
File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerDriver.java
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.mesos.runtime.clusterframework.actors.MesosResourceManagerActorFactory;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.actor.UntypedAbstractActor;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Implementation of {@link ResourceManagerDriver} for Mesos deployment.
+ */
+public class MesosResourceManagerDriver extends AbstractResourceManagerDriver<RegisteredMesosWorkerNode> {
+
+	/** The Mesos configuration (master and framework info). */
+	private final MesosConfiguration mesosConfig;
+
+	/** The Mesos services needed by the resource manager. */
+	private final MesosServices mesosServices;
+
+	/** The TaskManager container parameters (like container memory size). */
+	private final MesosTaskManagerParameters taskManagerParameters;
+
+	/** Container specification for launching a TM. */
+	private final ContainerSpecification taskManagerContainerSpec;
+
+	/** Server for HTTP artifacts. */
+	private final MesosArtifactServer artifactServer;
+
+	/** Persistent storage of allocated containers. */
+	private MesosWorkerStore workerStore;
+
+	/** Factory for creating local actors. */
+	private final MesosResourceManagerActorFactory actorFactory;
+
+	/** Web url to show in mesos page. */
+	@Nullable
+	private final String webUiUrl;
+
+	/** Mesos scheduler driver. */
+	private SchedulerDriver schedulerDriver;
+
+	/** an adapter to receive messages from Akka actors. */
+	private ActorRef selfActor;
+
+	private ActorRef connectionMonitor;
+
+	private ActorRef taskMonitor;
+
+	private ActorRef launchCoordinator;
+
+	private ActorRef reconciliationCoordinator;
+
+	/** Workers that are requested but not yet offered. */
+	private final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
+	private final Map<ResourceID, CompletableFuture<RegisteredMesosWorkerNode>> requestResourceFutures;
+
+	private MesosConfiguration initializedMesosConfig;
+
+	public MesosResourceManagerDriver(
+			Configuration flinkConfig,
+			MesosServices mesosServices,
+			MesosConfiguration mesosConfig,
+			MesosTaskManagerParameters taskManagerParameters,
+			ContainerSpecification taskManagerContainerSpec,
+			@Nullable String webUiUrl) {
+		super(flinkConfig, GlobalConfiguration.loadConfiguration());
+
+		this.mesosServices = Preconditions.checkNotNull(mesosServices);
+		this.actorFactory = Preconditions.checkNotNull(mesosServices.createMesosResourceManagerActorFactory());
+
+		this.mesosConfig = Preconditions.checkNotNull(mesosConfig);
+
+		this.artifactServer = Preconditions.checkNotNull(mesosServices.getArtifactServer());
+
+		this.taskManagerParameters = Preconditions.checkNotNull(taskManagerParameters);
+		this.taskManagerContainerSpec = Preconditions.checkNotNull(taskManagerContainerSpec);
+		this.webUiUrl = webUiUrl;
+
+		this.workersInNew = new HashMap<>();
+		this.requestResourceFutures = new HashMap<>();
+	}
+
+	// ------------------------------------------------------------------------
+	//  ResourceManagerDriver
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected void initializeInternal() throws Exception {
+		// create and start the worker store
+		try {
+			this.workerStore = mesosServices.createMesosWorkerStore(flinkConfig);
+			workerStore.start();
+		} catch (Exception e) {
+			throw new ResourceManagerException("Unable to initialize the worker store.", e);
+		}
+
+		// Prepare to register with Mesos
+		Protos.FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo()
+				.clone()
+				.setCheckpoint(true);
+		if (webUiUrl != null) {
+			frameworkInfo.setWebuiUrl(webUiUrl);
+		}
+
+		try {
+			Option<Protos.FrameworkID> frameworkID = workerStore.getFrameworkID();
+			if (frameworkID.isEmpty()) {
+				log.info("Registering as new framework.");
+			} else {
+				log.info("Recovery scenario: re-registering using framework ID {}.", frameworkID.get().getValue());
+				frameworkInfo.setId(frameworkID.get());
+			}
+		} catch (Exception e) {
+			throw new ResourceManagerException("Unable to recover the framework ID.", e);
+		}
+
+		initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
+		MesosConfiguration.logMesosConfig(log, initializedMesosConfig);
+
+		this.selfActor = actorFactory.createSelfActorForMesosResourceManagerDriver(this);
+
+		// configure the artifact server to serve the TM container artifacts
+		try {
+			LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec);
+		}
+		catch (IOException e) {
+			throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e);
+		}
+	}
+
+	@Override
+	public CompletableFuture<Void> terminate() {
+		return stopSupportingActorsAsync();
+	}
+
+	@Override
+	public CompletableFuture<Void> onGrantLeadership() {
+		Preconditions.checkState(initializedMesosConfig != null);
+
+		schedulerDriver = mesosServices.createMesosSchedulerDriver(
+				initializedMesosConfig,
+				new MesosResourceManagerSchedulerCallback(),
+				false);
+
+		// create supporting actors
+		connectionMonitor = actorFactory.createConnectionMonitor(flinkConfig);
+		launchCoordinator = actorFactory.createLaunchCoordinator(flinkConfig, selfActor, schedulerDriver, createOptimizer());
+		reconciliationCoordinator = actorFactory.createReconciliationCoordinator(flinkConfig, schedulerDriver);
+		taskMonitor = actorFactory.createTaskMonitor(flinkConfig, selfActor, schedulerDriver);
+
+		return getWorkersAsync().thenApplyAsync((tasksFromPreviousAttempts) -> {
+			// recover state
+			recoverWorkers(tasksFromPreviousAttempts);
+
+			// begin scheduling
+			connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor);
+			schedulerDriver.start();
+
+			log.info("Mesos resource manager started.");
+			return null;
+		}, getMainThreadExecutor());
+	}
+
+	@Override
+	public CompletableFuture<Void> onRevokeLeadership() {
+		schedulerDriver.stop(true);
+
+		workersInNew.clear();
+		requestResourceFutures.clear();
+
+		return stopSupportingActorsAsync();
+	}
+
+	@Override
+	public void deregisterApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws Exception {
+		log.info("Shutting down and unregistering as a Mesos framework.");
+
+		Exception exception = null;
+
+		try {
+			// unregister the framework, which implicitly removes all tasks.
+			schedulerDriver.stop(false);
+		} catch (Exception ex) {
+			exception = new Exception("Could not unregister the Mesos framework.", ex);
+		}
+
+		try {
+			workerStore.stop(true);
+		} catch (Exception ex) {
+			exception = ExceptionUtils.firstOrSuppressed(
+					new Exception("Could not stop the Mesos worker store.", ex),
+					exception);
+		}
+
+		if (exception != null) {
+			throw new ResourceManagerException("Could not properly shut down the Mesos application.", exception);
+		}
+	}
+
+	@Override
+	public CompletableFuture<RegisteredMesosWorkerNode> requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
+		Preconditions.checkArgument(Objects.equals(
+				taskExecutorProcessSpec,
+				taskManagerParameters.containeredParameters().getTaskExecutorProcessSpec()));
+		log.info("Starting a new worker.");
+
+		try {
+			// generate new workers into persistent state and launch associated actors
+			// TODO: arbitrary WorkerResourceSpec used here, which should be removed after removing MesosResourceManager.
+			MesosWorkerStore.Worker worker = MesosWorkerStore.Worker.newWorker(workerStore.newTaskID(), WorkerResourceSpec.ZERO);
+			workerStore.putWorker(worker);
+
+			final ResourceID resourceId = extractResourceID(worker.taskID());
+			workersInNew.put(resourceId, worker);
+
+			final CompletableFuture<RegisteredMesosWorkerNode> requestResourceFuture = new CompletableFuture<>();
+			requestResourceFutures.put(resourceId, requestResourceFuture);
+
+			LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());
+
+			log.info("Scheduling Mesos task {} with ({} MB, {} cpus, {} gpus, {} disk MB, {} Mbps).",
+					launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs(),
+					launchable.taskRequest().getScalarRequests().get("gpus"), launchable.taskRequest().getDisk(), launchable.taskRequest().getNetworkMbps());
+
+			// tell the task monitor about the new plans
+			taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
+
+			// tell the launch coordinator to launch the new tasks
+			launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList(launchable)), selfActor);
+
+			return requestResourceFuture;
+		} catch (Exception ex) {
+			final ResourceManagerException exception =  new ResourceManagerException("Unable to request new workers.", ex);
+			getResourceEventHandler().onError(exception);
+			return FutureUtils.completedExceptionally(exception);
+		}
+	}
+
+	@Override
+	public void releaseResource(RegisteredMesosWorkerNode workerNode) {
+		try {
+			// update persistent state of worker to Released
+			MesosWorkerStore.Worker worker = workerNode.getWorker();
+			worker = worker.releaseWorker();
+			workerStore.putWorker(worker);
+
+			taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
+
+			if (worker.hostname().isDefined()) {
+				// tell the launch coordinator that the task is being unassigned from the host, for planning purposes
+				launchCoordinator.tell(new LaunchCoordinator.Unassign(worker.taskID(), worker.hostname().get()), selfActor);
+			}
+		} catch (Exception e) {
+			getResourceEventHandler().onError(new ResourceManagerException("Unable to release a worker.", e));
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Mesos Specific
+	// ------------------------------------------------------------------------
+
+	private void registered(Registered message) {
+		connectionMonitor.tell(message, selfActor);
+		try {
+			workerStore.setFrameworkID(Option.apply(message.frameworkId()));
+		} catch (Exception ex) {
+			getResourceEventHandler().onError(new ResourceManagerException("Unable to store the assigned framework ID.", ex));
+			return;
+		}
+
+		launchCoordinator.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		taskMonitor.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when reconnected to Mesos following a failover event.
+	 */
+	private void reregistered(ReRegistered message) {
+		connectionMonitor.tell(message, selfActor);
+		launchCoordinator.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		taskMonitor.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when disconnected from Mesos.
+	 */
+	private void disconnected(Disconnected message) {
+		connectionMonitor.tell(message, selfActor);
+		launchCoordinator.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		taskMonitor.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when resource offers are made to the framework.
+	 */
+	private void resourceOffers(ResourceOffers message) {
+		launchCoordinator.tell(message, selfActor);
+	}
+
+	/**
+	 * Called when resource offers are rescinded.
+	 */
+	private void offerRescinded(OfferRescinded message) {
+		launchCoordinator.tell(message, selfActor);
+	}
+
+	/**
+	 * Handles a task status update from Mesos.
+	 */
+	private void statusUpdate(StatusUpdate message) {
+		taskMonitor.tell(message, selfActor);
+		reconciliationCoordinator.tell(message, selfActor);
+		schedulerDriver.acknowledgeStatusUpdate(message.status());
+	}
+
+	/**
+	 * Accept offers as advised by the launch coordinator.
+	 *
+	 * <p>Acceptance is routed through the RM to update the persistent state before
+	 * forwarding the message to Mesos.
+	 */
+	private void acceptOffers(AcceptOffers msg) {
+		try {
+			List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<>(msg.operations().size());
+
+			// transition the persistent state of some tasks to Launched
+			for (Protos.Offer.Operation op : msg.operations()) {
+				if (op.getType() == Protos.Offer.Operation.Type.LAUNCH) {
+					for (Protos.TaskInfo info : op.getLaunch().getTaskInfosList()) {
+
+						final ResourceID resourceId = extractResourceID(info.getTaskId());
+
+						MesosWorkerStore.Worker worker = workersInNew.remove(resourceId);
+						assert (worker != null);
+
+						worker = worker.launchWorker(info.getSlaveId(), msg.hostname());
+						workerStore.putWorker(worker);
+
+						final CompletableFuture<RegisteredMesosWorkerNode> requestResourceFuture = requestResourceFutures.remove(resourceId);
+						assert (requestResourceFuture != null);
+
+						requestResourceFuture.complete(new RegisteredMesosWorkerNode(worker));
+
+						log.info("Launching Mesos task {} on host {}.",
+								worker.taskID().getValue(), worker.hostname().get());
+
+						toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
+					}
+				}
+			}
+
+			// tell the task monitor about the new plans
+			for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) {
+				taskMonitor.tell(update, selfActor);
+			}
+
+			// send the acceptance message to Mesos
+			schedulerDriver.acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
+		} catch (Exception ex) {
+			getResourceEventHandler().onError(new ResourceManagerException("unable to accept offers", ex));
+		}
+	}
+
+	/**
+	 * Handles a reconciliation request from a task monitor.
+	 */
+	private void reconcile(ReconciliationCoordinator.Reconcile message) {
+		// forward to the reconciliation coordinator
+		reconciliationCoordinator.tell(message, selfActor);
+	}
+
+	/**
+	 * Handles a termination notification from a task monitor.
+	 */
+	private void taskTerminated(TaskMonitor.TaskTerminated message) {
+		Protos.TaskID taskID = message.taskID();
+		Protos.TaskStatus status = message.status();
+
+		// note: this callback occurs for failed containers and for released containers alike
+		final ResourceID id = extractResourceID(taskID);
+
+		boolean existed;
+		try {
+			existed = workerStore.removeWorker(taskID);
+		} catch (Exception ex) {
+			getResourceEventHandler().onError(new ResourceManagerException("unable to remove worker", ex));
+			return;
+		}
+
+		if (!existed) {
+			log.info("Received a termination notice for an unrecognized worker: {}", id);
+			return;
+		}
+
+		// check if this is a failed task or a released task
+		assert(!workersInNew.containsKey(id));
+
+		final String diagnostics = extractTerminatedDiagnostics(id, status);
+		getResourceEventHandler().onWorkerTerminated(id, diagnostics);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Internal
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Fetches framework/worker information persisted by a prior incarnation of the RM.
+	 */
+	private CompletableFuture<List<MesosWorkerStore.Worker>> getWorkersAsync() {
+		// if this resource manager is recovering from failure,
+		// then some worker tasks are most likely still alive and we can re-obtain them
+		return CompletableFuture.supplyAsync(() -> {
+			try {
+				final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers();
+				for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
+					if (worker.state() == MesosWorkerStore.WorkerState.New) {
+						// remove new workers because allocation requests are transient
+						workerStore.removeWorker(worker.taskID());
+					}
+				}
+				return tasksFromPreviousAttempts;
+			} catch (final Exception e) {
+				throw new CompletionException(new ResourceManagerException(e));
+			}
+		});

Review comment:
       True. Robert has introduced `ioExecutor` to resource managers in FLINK-19241. I think we can pass it from `ActiveResourceManager` in `ResourceManagerDriver#initialize`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org