You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/31 10:58:49 UTC
[flink] branch master updated: [FLINK-11412] [mesos] Remove legacy
MesosFlinkResourceManager
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a43d3c8 [FLINK-11412] [mesos] Remove legacy MesosFlinkResourceManager
a43d3c8 is described below
commit a43d3c85884de1f45d4884b26265a53a69d6bee7
Author: tison <wa...@gmail.com>
AuthorDate: Tue Jan 29 11:54:37 2019 +0800
[FLINK-11412] [mesos] Remove legacy MesosFlinkResourceManager
---
.../src/main/resources/log4j.properties | 1 -
.../MesosFlinkResourceManager.java | 795 ---------------------
.../RegisteredMesosWorkerNode.java | 2 +-
flink-mesos/src/main/resources/log4j.properties | 1 -
.../MesosFlinkResourceManagerTest.java | 785 --------------------
.../clusterframework/MesosResourceManagerTest.java | 4 +-
6 files changed, 3 insertions(+), 1585 deletions(-)
diff --git a/flink-container/src/main/resources/log4j.properties b/flink-container/src/main/resources/log4j.properties
index 62cb6ed..dcbb808 100644
--- a/flink-container/src/main/resources/log4j.properties
+++ b/flink-container/src/main/resources/log4j.properties
@@ -24,4 +24,3 @@ log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.logger.org.apache.flink.mesos=DEBUG
-log4j.logger.org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager=INFO
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
deleted file mode 100644
index dcf6a7e..0000000
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
+++ /dev/null
@@ -1,795 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.mesos.runtime.clusterframework;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.mesos.configuration.MesosOptions;
-import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
-import org.apache.flink.mesos.scheduler.ConnectionMonitor;
-import org.apache.flink.mesos.scheduler.LaunchCoordinator;
-import org.apache.flink.mesos.scheduler.LaunchableTask;
-import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
-import org.apache.flink.mesos.scheduler.SchedulerProxy;
-import org.apache.flink.mesos.scheduler.TaskMonitor;
-import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
-import org.apache.flink.mesos.scheduler.Tasks;
-import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
-import org.apache.flink.mesos.scheduler.messages.Disconnected;
-import org.apache.flink.mesos.scheduler.messages.Error;
-import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
-import org.apache.flink.mesos.scheduler.messages.ReRegistered;
-import org.apache.flink.mesos.scheduler.messages.Registered;
-import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
-import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
-import org.apache.flink.mesos.util.MesosArtifactResolver;
-import org.apache.flink.mesos.util.MesosConfiguration;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
-import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
-import org.apache.flink.runtime.clusterframework.messages.StopCluster;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import com.netflix.fenzo.TaskRequest;
-import com.netflix.fenzo.TaskScheduler;
-import com.netflix.fenzo.VirtualMachineLease;
-import com.netflix.fenzo.functions.Action1;
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.FrameworkInfo;
-import org.apache.mesos.Protos.FrameworkInfo.Capability;
-import org.apache.mesos.SchedulerDriver;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import scala.Option;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Flink Resource Manager for Apache Mesos.
- */
-public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMesosWorkerNode> {
-
- /** The Mesos configuration (master and framework info). */
- private final MesosConfiguration mesosConfig;
-
- /** The TaskManager container parameters (like container memory size). */
- private final MesosTaskManagerParameters taskManagerParameters;
-
- /** Container specification for launching a TM. */
- private final ContainerSpecification taskManagerContainerSpec;
-
- /** Resolver for HTTP artifacts. **/
- private final MesosArtifactResolver artifactResolver;
-
- /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */
- private final int maxFailedTasks;
-
- /** Callback handler for the asynchronous Mesos scheduler. */
- private SchedulerProxy schedulerCallbackHandler;
-
- /** Mesos scheduler driver. */
- private SchedulerDriver schedulerDriver;
-
- private ActorRef connectionMonitor;
-
- private ActorRef taskRouter;
-
- private ActorRef launchCoordinator;
-
- private ActorRef reconciliationCoordinator;
-
- private final MesosWorkerStore workerStore;
-
- /** planning state related to workers - package private for unit test purposes. */
- final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
- final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
- final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
-
- /** The number of failed tasks since the master became active. */
- private int failedTasksSoFar;
-
- public MesosFlinkResourceManager(
- Configuration flinkConfig,
- MesosConfiguration mesosConfig,
- MesosWorkerStore workerStore,
- LeaderRetrievalService leaderRetrievalService,
- MesosTaskManagerParameters taskManagerParameters,
- ContainerSpecification taskManagerContainerSpec,
- MesosArtifactResolver artifactResolver,
- int maxFailedTasks,
- int numInitialTaskManagers) {
-
- super(numInitialTaskManagers, flinkConfig, leaderRetrievalService);
-
- this.mesosConfig = requireNonNull(mesosConfig);
-
- this.workerStore = requireNonNull(workerStore);
- this.artifactResolver = requireNonNull(artifactResolver);
-
- this.taskManagerParameters = requireNonNull(taskManagerParameters);
- this.taskManagerContainerSpec = requireNonNull(taskManagerContainerSpec);
- this.maxFailedTasks = maxFailedTasks;
-
- this.workersInNew = new HashMap<>();
- this.workersInLaunch = new HashMap<>();
- this.workersBeingReturned = new HashMap<>();
- }
-
- // ------------------------------------------------------------------------
- // Mesos-specific behavior
- // ------------------------------------------------------------------------
-
- @Override
- protected void initialize() throws Exception {
- LOG.info("Initializing Mesos resource master");
-
- workerStore.start();
-
- // create the scheduler driver to communicate with Mesos
- schedulerCallbackHandler = new SchedulerProxy(self());
-
- // register with Mesos
- FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo()
- .clone()
- .setCheckpoint(true);
-
- Option<Protos.FrameworkID> frameworkID = workerStore.getFrameworkID();
- if (frameworkID.isEmpty()) {
- LOG.info("Registering as new framework.");
- }
- else {
- LOG.info("Recovery scenario: re-registering using framework ID {}.", frameworkID.get().getValue());
- frameworkInfo.setId(frameworkID.get());
- }
-
- if (taskManagerParameters.gpus() > 0) {
- LOG.info("Add GPU_RESOURCES capability to framework");
- frameworkInfo.addCapabilities(Capability.newBuilder().setType(Capability.Type.GPU_RESOURCES));
- }
-
- MesosConfiguration initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo);
- MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
- schedulerDriver = initializedMesosConfig.createDriver(schedulerCallbackHandler, false);
-
- // create supporting actors
- connectionMonitor = createConnectionMonitor();
- launchCoordinator = createLaunchCoordinator();
- reconciliationCoordinator = createReconciliationCoordinator();
- taskRouter = createTaskRouter();
-
- recoverWorkers();
-
- connectionMonitor.tell(new ConnectionMonitor.Start(), self());
- schedulerDriver.start();
- }
-
- protected ActorRef createConnectionMonitor() {
- return context().actorOf(
- ConnectionMonitor.createActorProps(ConnectionMonitor.class, config),
- "connectionMonitor");
- }
-
- protected ActorRef createTaskRouter() {
- return context().actorOf(
- Tasks.createActorProps(Tasks.class, self(), config, schedulerDriver, TaskMonitor.class),
- "tasks");
- }
-
- protected ActorRef createLaunchCoordinator() {
- return context().actorOf(
- LaunchCoordinator.createActorProps(LaunchCoordinator.class, self(), config, schedulerDriver, createOptimizer()),
- "launchCoordinator");
- }
-
- protected ActorRef createReconciliationCoordinator() {
- return context().actorOf(
- ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, config, schedulerDriver),
- "reconciliationCoordinator");
- }
-
- @Override
- public void postStop() {
- LOG.info("Stopping Mesos resource master");
- super.postStop();
- }
-
- // ------------------------------------------------------------------------
- // Actor messages
- // ------------------------------------------------------------------------
-
- @Override
- protected void handleMessage(Object message) {
-
- // check for Mesos-specific actor messages first
-
- // --- messages about Mesos connection
- if (message instanceof Registered) {
- registered((Registered) message);
- } else if (message instanceof ReRegistered) {
- reregistered((ReRegistered) message);
- } else if (message instanceof Disconnected) {
- disconnected((Disconnected) message);
- } else if (message instanceof Error) {
- error(((Error) message).message());
-
- // --- messages about offers
- } else if (message instanceof ResourceOffers || message instanceof OfferRescinded) {
- launchCoordinator.tell(message, self());
- } else if (message instanceof AcceptOffers) {
- acceptOffers((AcceptOffers) message);
-
- // --- messages about tasks
- } else if (message instanceof StatusUpdate) {
- taskStatusUpdated((StatusUpdate) message);
- } else if (message instanceof ReconciliationCoordinator.Reconcile) {
- // a reconciliation request from a task
- reconciliationCoordinator.tell(message, self());
- } else if (message instanceof TaskMonitor.TaskTerminated) {
- // a termination message from a task
- TaskMonitor.TaskTerminated msg = (TaskMonitor.TaskTerminated) message;
- taskTerminated(msg.taskID(), msg.status());
-
- } else {
- // message handled by the generic resource master code
- super.handleMessage(message);
- }
- }
-
- /**
- * Called to shut down the cluster (not a failover situation).
- *
- * @param finalStatus The application status to report.
- * @param optionalDiagnostics An optional diagnostics message.
- */
- @Override
- protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
-
- LOG.info("Shutting down and unregistering as a Mesos framework.");
- try {
- // unregister the framework, which implicitly removes all tasks.
- schedulerDriver.stop(false);
- } catch (Exception ex) {
- LOG.warn("unable to unregister the framework", ex);
- }
-
- try {
- workerStore.stop(true);
- } catch (Exception ex) {
- LOG.warn("unable to stop the worker state store", ex);
- }
-
- context().stop(self());
- }
-
- @Override
- protected void fatalError(String message, Throwable error) {
- // we do not unregister, but cause a hard fail of this process, to have it
- // restarted by the dispatcher
- LOG.error("FATAL ERROR IN MESOS APPLICATION MASTER: " + message, error);
- LOG.error("Shutting down process");
-
- // kill this process, this will make an external supervisor (the dispatcher) restart the process
- System.exit(EXIT_CODE_FATAL_ERROR);
- }
-
- // ------------------------------------------------------------------------
- // Worker Management
- // ------------------------------------------------------------------------
-
- /**
- * Recover framework/worker information persisted by a prior incarnation of the RM.
- */
- private void recoverWorkers() throws Exception {
- // if this application master starts as part of an ApplicationMaster/JobManager recovery,
- // then some worker tasks are most likely still alive and we can re-obtain them
- final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers();
-
- if (!tasksFromPreviousAttempts.isEmpty()) {
- LOG.info("Retrieved {} TaskManagers from previous attempt", tasksFromPreviousAttempts.size());
-
- List<Tuple2<TaskRequest, String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
- List<LaunchableTask> toLaunch = new ArrayList<>(tasksFromPreviousAttempts.size());
-
- for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
- LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());
-
- switch (worker.state()) {
- case New:
- workersInNew.put(extractResourceID(worker.taskID()), worker);
- toLaunch.add(launchable);
- break;
- case Launched:
- workersInLaunch.put(extractResourceID(worker.taskID()), worker);
- toAssign.add(new Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
- break;
- case Released:
- workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
- break;
- }
- taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
- }
-
- // tell the launch coordinator about prior assignments
- if (toAssign.size() >= 1) {
- launchCoordinator.tell(new LaunchCoordinator.Assign(toAssign), self());
- }
- // tell the launch coordinator to launch any new tasks
- if (toLaunch.size() >= 1) {
- launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self());
- }
- }
- }
-
- /**
- * Plan for some additional workers to be launched.
- *
- * @param numWorkers The number of workers to allocate.
- */
- @Override
- protected void requestNewWorkers(int numWorkers) {
-
- try {
- List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<>(numWorkers);
- List<LaunchableTask> toLaunch = new ArrayList<>(numWorkers);
-
- // generate new workers into persistent state and launch associated actors
- for (int i = 0; i < numWorkers; i++) {
- MesosWorkerStore.Worker worker = MesosWorkerStore.Worker.newWorker(workerStore.newTaskID());
- workerStore.putWorker(worker);
- workersInNew.put(extractResourceID(worker.taskID()), worker);
-
- LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());
-
- LOG.info("Scheduling Mesos task {} with ({} MB, {} cpus, {} gpus).",
- launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs(),
- launchable.taskRequest().getScalarRequests().get("gpus"));
-
- toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
- toLaunch.add(launchable);
- }
-
- // tell the task router about the new plans
- for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) {
- taskRouter.tell(update, self());
- }
-
- // tell the launch coordinator to launch the new tasks
- if (toLaunch.size() >= 1) {
- launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self());
- }
- } catch (Exception ex) {
- fatalError("unable to request new workers", ex);
- }
- }
-
- /**
- * Accept offers as advised by the launch coordinator.
- *
- * <p>Acceptance is routed through the RM to update the persistent state before
- * forwarding the message to Mesos.
- */
- private void acceptOffers(AcceptOffers msg) {
-
- try {
- List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<>(msg.operations().size());
-
- // transition the persistent state of some tasks to Launched
- for (Protos.Offer.Operation op : msg.operations()) {
- if (op.getType() != Protos.Offer.Operation.Type.LAUNCH) {
- continue;
- }
- for (Protos.TaskInfo info : op.getLaunch().getTaskInfosList()) {
- MesosWorkerStore.Worker worker = workersInNew.remove(extractResourceID(info.getTaskId()));
- assert (worker != null);
-
- worker = worker.launchWorker(info.getSlaveId(), msg.hostname());
- workerStore.putWorker(worker);
- workersInLaunch.put(extractResourceID(worker.taskID()), worker);
-
- LOG.info("Launching Mesos task {} on host {}.",
- worker.taskID().getValue(), worker.hostname().get());
-
- toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
- }
- }
-
- // tell the task router about the new plans
- for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) {
- taskRouter.tell(update, self());
- }
-
- // send the acceptance message to Mesos
- schedulerDriver.acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
- } catch (Exception ex) {
- fatalError("unable to accept offers", ex);
- }
- }
-
- /**
- * Handle a task status change.
- */
- private void taskStatusUpdated(StatusUpdate message) {
- taskRouter.tell(message, self());
- reconciliationCoordinator.tell(message, self());
- schedulerDriver.acknowledgeStatusUpdate(message.status());
- }
-
- /**
- * Accept the given started worker into the internal state.
- *
- * @param resourceID The worker resource id
- * @return A registered worker node record.
- */
- @Override
- protected RegisteredMesosWorkerNode workerStarted(ResourceID resourceID) {
- MesosWorkerStore.Worker inLaunch = workersInLaunch.remove(resourceID);
- if (inLaunch == null) {
- // Worker was not in state "being launched", this can indicate that the TaskManager
- // in this worker was already registered or that the container was not started
- // by this resource manager. Simply ignore this resourceID.
- return null;
- }
- return new RegisteredMesosWorkerNode(inLaunch);
- }
-
- /**
- * Accept the given registered workers into the internal state.
- *
- * @param toConsolidate The worker IDs known previously to the JobManager.
- * @return A collection of registered worker node records.
- */
- @Override
- protected Collection<RegisteredMesosWorkerNode> reacceptRegisteredWorkers(Collection<ResourceID> toConsolidate) {
-
- // we check for each task manager if we recognize its Mesos task ID
- List<RegisteredMesosWorkerNode> accepted = new ArrayList<>(toConsolidate.size());
- for (ResourceID resourceID : toConsolidate) {
- MesosWorkerStore.Worker worker = workersInLaunch.remove(resourceID);
- if (worker != null) {
- LOG.info("Mesos worker consolidation recognizes TaskManager {}.", resourceID);
- accepted.add(new RegisteredMesosWorkerNode(worker));
- } else {
- if (isStarted(resourceID)) {
- LOG.info("TaskManager {} has already been registered at the resource manager.", resourceID);
- }
- else {
- LOG.info("Mesos worker consolidation does not recognize TaskManager {}.", resourceID);
- }
- }
- }
- return accepted;
- }
-
- /**
- * Release the given pending worker.
- */
- @Override
- protected void releasePendingWorker(ResourceID id) {
- MesosWorkerStore.Worker worker = workersInLaunch.remove(id);
- if (worker != null) {
- releaseWorker(worker);
- } else {
- LOG.error("Cannot find worker {} to release. Ignoring request.", id);
- }
- }
-
- /**
- * Release the given started worker.
- */
- @Override
- protected void releaseStartedWorker(RegisteredMesosWorkerNode worker) {
- releaseWorker(worker.getWorker());
- }
-
- /**
- * Plan for the removal of the given worker.
- */
- private void releaseWorker(MesosWorkerStore.Worker worker) {
- try {
- LOG.info("Releasing worker {}", worker.taskID());
-
- // update persistent state of worker to Released
- worker = worker.releaseWorker();
- workerStore.putWorker(worker);
- workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
- taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
-
- if (worker.hostname().isDefined()) {
- // tell the launch coordinator that the task is being unassigned from the host, for planning purposes
- launchCoordinator.tell(new LaunchCoordinator.Unassign(worker.taskID(), worker.hostname().get()), self());
- }
- }
- catch (Exception ex) {
- fatalError("unable to release worker", ex);
- }
- }
-
- @Override
- protected int getNumWorkerRequestsPending() {
- return workersInNew.size();
- }
-
- @Override
- protected int getNumWorkersPendingRegistration() {
- return workersInLaunch.size();
- }
-
- // ------------------------------------------------------------------------
- // Callbacks from the Mesos Master
- // ------------------------------------------------------------------------
-
- /**
- * Called when connected to Mesos as a new framework.
- */
- private void registered(Registered message) {
- connectionMonitor.tell(message, self());
-
- try {
- workerStore.setFrameworkID(Option.apply(message.frameworkId()));
- } catch (Exception ex) {
- fatalError("unable to store the assigned framework ID", ex);
- return;
- }
-
- launchCoordinator.tell(message, self());
- reconciliationCoordinator.tell(message, self());
- taskRouter.tell(message, self());
- }
-
- /**
- * Called when reconnected to Mesos following a failover event.
- */
- private void reregistered(ReRegistered message) {
- connectionMonitor.tell(message, self());
- launchCoordinator.tell(message, self());
- reconciliationCoordinator.tell(message, self());
- taskRouter.tell(message, self());
- }
-
- /**
- * Called when disconnected from Mesos.
- */
- private void disconnected(Disconnected message) {
- connectionMonitor.tell(message, self());
- launchCoordinator.tell(message, self());
- reconciliationCoordinator.tell(message, self());
- taskRouter.tell(message, self());
- }
-
- /**
- * Called when an error is reported by the scheduler callback.
- */
- private void error(String message) {
- self().tell(new FatalErrorOccurred("Connection to Mesos failed", new Exception(message)), self());
- }
-
- /**
- * Invoked when a Mesos task reaches a terminal status.
- */
- private void taskTerminated(Protos.TaskID taskID, Protos.TaskStatus status) {
- // this callback occurs for failed containers and for released containers alike
-
- final ResourceID id = extractResourceID(taskID);
-
- boolean existed;
- try {
- existed = workerStore.removeWorker(taskID);
- } catch (Exception ex) {
- fatalError("unable to remove worker", ex);
- return;
- }
-
- if (!existed) {
- LOG.info("Received a termination notice for an unrecognized worker: {}", id);
- return;
- }
-
- // check if this is a failed task or a released task
- if (workersBeingReturned.remove(id) != null) {
- // regular finished worker that we released
- LOG.info("Worker {} finished successfully with diagnostics: {}",
- id, status.getMessage());
- } else {
- // failed worker, either at startup, or running
- final MesosWorkerStore.Worker launched = workersInLaunch.remove(id);
- if (launched != null) {
- LOG.info("Mesos task {} failed, with a TaskManager in launch or registration. " +
- "State: {} Reason: {} ({})", id, status.getState(), status.getReason(), status.getMessage());
- // we will trigger re-acquiring new workers at the end
- } else {
- // failed registered worker
- LOG.info("Mesos task {} failed, with a registered TaskManager. " +
- "State: {} Reason: {} ({})", id, status.getState(), status.getReason(), status.getMessage());
-
- // notify the generic logic, which notifies the JobManager, etc.
- notifyWorkerFailed(id, "Mesos task " + id + " failed. State: " + status.getState());
- }
-
- // general failure logging
- failedTasksSoFar++;
-
- String diagMessage = String.format("Diagnostics for task %s in state %s : " +
- "reason=%s message=%s",
- id, status.getState(), status.getReason(), status.getMessage());
- sendInfoMessage(diagMessage);
-
- LOG.info(diagMessage);
- LOG.info("Total number of failed tasks so far: {}", failedTasksSoFar);
-
- // maxFailedTasks == -1 is infinite number of retries.
- if (maxFailedTasks >= 0 && failedTasksSoFar > maxFailedTasks) {
- String msg = "Stopping Mesos session because the number of failed tasks ("
- + failedTasksSoFar + ") exceeded the maximum failed tasks ("
- + maxFailedTasks + "). This number is controlled by the '"
- + MesosOptions.MAX_FAILED_TASKS.key() + "' configuration setting. "
- + "By default its the number of requested tasks.";
-
- LOG.error(msg);
- self().tell(decorateMessage(new StopCluster(ApplicationStatus.FAILED, msg)),
- ActorRef.noSender());
-
- // no need to do anything else
- return;
- }
- }
-
- // in case failed containers were among the finished containers, make
- // sure we re-examine and request new ones
- triggerCheckWorkers();
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) {
- LaunchableMesosWorker launchable =
- new LaunchableMesosWorker(
- artifactResolver,
- taskManagerParameters,
- taskManagerContainerSpec,
- taskID,
- mesosConfig);
-
- return launchable;
- }
-
- /**
- * Extracts a unique ResourceID from the Mesos task.
- *
- * @param taskId the Mesos TaskID
- * @return The ResourceID for the container
- */
- static ResourceID extractResourceID(Protos.TaskID taskId) {
- return new ResourceID(taskId.getValue());
- }
-
- /**
- * Extracts the Mesos task goal state from the worker information.
- * @param worker the persistent worker information.
- * @return goal state information for the {@Link TaskMonitor}.
- */
- static TaskMonitor.TaskGoalState extractGoalState(MesosWorkerStore.Worker worker) {
- switch (worker.state()) {
- case New:
- return new TaskMonitor.New(worker.taskID());
- case Launched:
- return new TaskMonitor.Launched(worker.taskID(), worker.slaveID().get());
- case Released:
- return new TaskMonitor.Released(worker.taskID(), worker.slaveID().get());
- default:
- throw new IllegalArgumentException("unsupported worker state");
- }
- }
-
- /**
- * Creates the Fenzo optimizer (builder).
- * The builder is an indirection to facilitate unit testing of the Launch Coordinator.
- */
- private static TaskSchedulerBuilder createOptimizer() {
- return new TaskSchedulerBuilder() {
- TaskScheduler.Builder builder = new TaskScheduler.Builder();
-
- @Override
- public TaskSchedulerBuilder withLeaseRejectAction(Action1<VirtualMachineLease> action) {
- builder.withLeaseRejectAction(action);
- return this;
- }
-
- @Override
- public TaskScheduler build() {
- return builder.build();
- }
- };
- }
-
- /**
- * Creates the props needed to instantiate this actor.
- *
- * <p>Rather than extracting and validating parameters in the constructor, this factory method takes
- * care of that. That way, errors occur synchronously, and are not swallowed simply in a
- * failed asynchronous attempt to start the actor.
-
- * @param actorClass
- * The actor class, to allow overriding this actor with subclasses for testing.
- * @param flinkConfig
- * The Flink configuration object.
- * @param taskManagerParameters
- * The parameters for launching TaskManager containers.
- * @param taskManagerContainerSpec
- * The container specification.
- * @param artifactResolver
- * The artifact resolver to locate artifacts
- * @param log
- * The logger to log to.
- *
- * @return The Props object to instantiate the MesosFlinkResourceManager actor.
- */
- public static Props createActorProps(
- Class<? extends MesosFlinkResourceManager> actorClass,
- Configuration flinkConfig,
- MesosConfiguration mesosConfig,
- MesosWorkerStore workerStore,
- LeaderRetrievalService leaderRetrievalService,
- MesosTaskManagerParameters taskManagerParameters,
- ContainerSpecification taskManagerContainerSpec,
- MesosArtifactResolver artifactResolver,
- Logger log) {
-
- final int numInitialTaskManagers = flinkConfig.getInteger(
- MesosOptions.INITIAL_TASKS);
- if (numInitialTaskManagers >= 0) {
- log.info("Mesos framework to allocate {} initial tasks",
- numInitialTaskManagers);
- }
- else {
- throw new IllegalConfigurationException("Invalid value for " +
- MesosOptions.INITIAL_TASKS.key() + ", which must be at least zero.");
- }
-
- final int maxFailedTasks = flinkConfig.getInteger(
- MesosOptions.MAX_FAILED_TASKS.key(), numInitialTaskManagers);
- if (maxFailedTasks >= 0) {
- log.info("Mesos framework tolerates {} failed tasks before giving up",
- maxFailedTasks);
- }
-
- return Props.create(actorClass,
- flinkConfig,
- mesosConfig,
- workerStore,
- leaderRetrievalService,
- taskManagerParameters,
- taskManagerContainerSpec,
- artifactResolver,
- maxFailedTasks,
- numInitialTaskManagers);
- }
-}
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.java
index c65c482..e12e6b8 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Preconditions;
import java.io.Serializable;
/**
- * A representation of a registered Mesos task managed by the {@link MesosFlinkResourceManager}.
+ * A representation of a registered Mesos task managed by the {@link MesosResourceManager}.
*/
public class RegisteredMesosWorkerNode implements Serializable, ResourceIDRetrievable {
diff --git a/flink-mesos/src/main/resources/log4j.properties b/flink-mesos/src/main/resources/log4j.properties
index 61e5afe..64d49ec 100644
--- a/flink-mesos/src/main/resources/log4j.properties
+++ b/flink-mesos/src/main/resources/log4j.properties
@@ -24,4 +24,3 @@ log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.logger.org.apache.flink.mesos=DEBUG
-log4j.logger.org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager=INFO
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
deleted file mode 100644
index 31394b2..0000000
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ /dev/null
@@ -1,785 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.mesos.runtime.clusterframework;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.mesos.configuration.MesosOptions;
-import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
-import org.apache.flink.mesos.scheduler.ConnectionMonitor;
-import org.apache.flink.mesos.scheduler.LaunchCoordinator;
-import org.apache.flink.mesos.scheduler.TaskMonitor;
-import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
-import org.apache.flink.mesos.scheduler.messages.Disconnected;
-import org.apache.flink.mesos.scheduler.messages.Error;
-import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
-import org.apache.flink.mesos.scheduler.messages.ReRegistered;
-import org.apache.flink.mesos.scheduler.messages.Registered;
-import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
-import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
-import org.apache.flink.mesos.util.MesosArtifactResolver;
-import org.apache.flink.mesos.util.MesosConfiguration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
-import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
-import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
-import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
-import org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize;
-import org.apache.flink.runtime.clusterframework.messages.StopCluster;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import akka.testkit.TestProbe;
-import com.netflix.fenzo.ConstraintEvaluator;
-import junit.framework.AssertionFailedError;
-import org.apache.mesos.Protos;
-import org.apache.mesos.Scheduler;
-import org.apache.mesos.SchedulerDriver;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-
-import scala.Option;
-
-import static java.util.Collections.singletonList;
-import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractGoalState;
-import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractResourceID;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasEntry;
-import static org.hamcrest.Matchers.hasSize;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * General tests for the Mesos resource manager component.
- */
-public class MesosFlinkResourceManagerTest extends TestLogger {
-
- private static final Logger LOG = LoggerFactory.getLogger(MesosFlinkResourceManagerTest.class);
-
- private static ActorSystem system;
-
- private static Configuration config = new Configuration() {
- private static final long serialVersionUID = -952579203067648838L;
-
- {
- setInteger(MesosOptions.MAX_FAILED_TASKS, -1);
- setInteger(MesosOptions.INITIAL_TASKS, 0);
- }};
-
- @BeforeClass
- public static void setup() {
- system = AkkaUtils.createLocalActorSystem(config);
- }
-
- @AfterClass
- public static void teardown() {
- JavaTestKit.shutdownActorSystem(system);
- }
-
- /**
- * The RM with some test-specific behavior.
- */
- static class TestingMesosFlinkResourceManager extends MesosFlinkResourceManager {
-
- public TestProbe connectionMonitor = new TestProbe(system);
- public TestProbe taskRouter = new TestProbe(system);
- public TestProbe launchCoordinator = new TestProbe(system);
- public TestProbe reconciliationCoordinator = new TestProbe(system);
-
- public TestingMesosFlinkResourceManager(
- Configuration flinkConfig,
- MesosConfiguration mesosConfig,
- MesosWorkerStore workerStore,
- LeaderRetrievalService leaderRetrievalService,
- MesosTaskManagerParameters taskManagerParameters,
- ContainerSpecification taskManagerContainerSpec,
- MesosArtifactResolver artifactResolver,
- int maxFailedTasks,
- int numInitialTaskManagers) {
-
- super(flinkConfig, mesosConfig, workerStore, leaderRetrievalService, taskManagerParameters,
- taskManagerContainerSpec, artifactResolver, maxFailedTasks, numInitialTaskManagers);
- }
-
- @Override
- protected ActorRef createConnectionMonitor() {
- return connectionMonitor.ref();
- }
-
- @Override
- protected ActorRef createTaskRouter() {
- return taskRouter.ref();
- }
-
- @Override
- protected ActorRef createLaunchCoordinator() {
- return launchCoordinator.ref();
- }
-
- @Override
- protected ActorRef createReconciliationCoordinator() {
- return reconciliationCoordinator.ref();
- }
-
- @Override
- protected void fatalError(String message, Throwable error) {
- // override the super's behavior of exiting the process
- context().stop(self());
- }
- }
-
- /**
- * The context fixture.
- */
- static class Context extends JavaTestKit implements AutoCloseable {
-
- // mocks
- public ActorGateway jobManager;
- public MesosConfiguration mesosConfig;
- public MesosWorkerStore workerStore;
- public MesosArtifactResolver artifactResolver;
- public SchedulerDriver schedulerDriver;
- public TestingMesosFlinkResourceManager resourceManagerInstance;
- public ActorGateway resourceManager;
-
- // domain objects for test purposes
- Protos.FrameworkID framework1 = Protos.FrameworkID.newBuilder().setValue("framework1").build();
- public Protos.SlaveID slave1 = Protos.SlaveID.newBuilder().setValue("slave1").build();
- public String slave1host = "localhost";
- public Protos.OfferID offer1 = Protos.OfferID.newBuilder().setValue("offer1").build();
- public Protos.TaskID task1 = Protos.TaskID.newBuilder().setValue("taskmanager-00001").build();
- public Protos.TaskID task2 = Protos.TaskID.newBuilder().setValue("taskmanager-00002").build();
- public Protos.TaskID task3 = Protos.TaskID.newBuilder().setValue("taskmanager-00003").build();
-
- private final TestingHighAvailabilityServices highAvailabilityServices;
-
- /**
- * Create mock RM dependencies.
- */
- public Context() {
- super(system);
-
- try {
- jobManager = TestingUtils.createForwardingActor(
- system,
- getTestActor(),
- HighAvailabilityServices.DEFAULT_LEADER_ID,
- Option.<String>empty());
-
- highAvailabilityServices = new TestingHighAvailabilityServices();
-
- highAvailabilityServices.setJobMasterLeaderRetriever(
- HighAvailabilityServices.DEFAULT_JOB_ID,
- new SettableLeaderRetrievalService(
- jobManager.path(),
- HighAvailabilityServices.DEFAULT_LEADER_ID));
-
- // scheduler driver
- schedulerDriver = mock(SchedulerDriver.class);
-
- // config
- mesosConfig = mock(MesosConfiguration.class);
- when(mesosConfig.frameworkInfo()).thenReturn(Protos.FrameworkInfo.newBuilder());
- when(mesosConfig.withFrameworkInfo(any(Protos.FrameworkInfo.Builder.class))).thenReturn(mesosConfig);
- when(mesosConfig.createDriver(any(Scheduler.class), anyBoolean())).thenReturn(schedulerDriver);
-
- // worker store
- workerStore = mock(MesosWorkerStore.class);
- when(workerStore.getFrameworkID()).thenReturn(Option.<Protos.FrameworkID>empty());
-
- // artifact
- artifactResolver = mock(MesosArtifactResolver.class);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
-
- /**
- * Initialize the resource manager.
- */
- public void initialize() {
- ContainerSpecification containerSpecification = new ContainerSpecification();
- ContaineredTaskManagerParameters containeredParams =
- new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
- MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
- 1.0, 1,
- MesosTaskManagerParameters.ContainerType.MESOS,
- Option.<String>empty(),
- containeredParams,
- Collections.<Protos.Volume>emptyList(),
- Collections.<Protos.Parameter>emptyList(),
- false,
- Collections.<ConstraintEvaluator>emptyList(),
- "",
- Option.<String>empty(),
- Option.<String>empty(),
- Collections.<String>emptyList());
-
- TestActorRef<TestingMesosFlinkResourceManager> resourceManagerRef =
- TestActorRef.create(system, MesosFlinkResourceManager.createActorProps(
- TestingMesosFlinkResourceManager.class,
- config,
- mesosConfig,
- workerStore,
- highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
- tmParams,
- containerSpecification,
- artifactResolver,
- LOG));
- resourceManagerInstance = resourceManagerRef.underlyingActor();
- resourceManager = new AkkaActorGateway(resourceManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
-
- verify(schedulerDriver).start();
- resourceManagerInstance.connectionMonitor.expectMsgClass(ConnectionMonitor.Start.class);
- }
-
- /**
- * Send a RegisterResourceManagerSuccessful message to the RM.
- * @param currentlyRegisteredTaskManagers the already-registered workers.
- */
- public void register(Collection<ResourceID> currentlyRegisteredTaskManagers) {
- // register with JM
- expectMsgClass(RegisterResourceManager.class);
- resourceManager.tell(
- new RegisterResourceManagerSuccessful(jobManager.actor(), currentlyRegisteredTaskManagers),
- jobManager);
- }
-
- /**
- * Prepares a launch operation.
- */
- public Protos.Offer.Operation launch(Protos.TaskInfo... taskInfo) {
- return Protos.Offer.Operation.newBuilder()
- .setType(Protos.Offer.Operation.Type.LAUNCH)
- .setLaunch(Protos.Offer.Operation.Launch.newBuilder().addAllTaskInfos(Arrays.asList(taskInfo))
- ).build();
- }
-
- @Override
- public void close() throws Exception {
- highAvailabilityServices.closeAndCleanupAllData();
- }
- }
-
- /**
- * Test recovery of persistent workers.
- */
- @Test
- public void testRecoverWorkers() {
- new Context() {{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
- try {
- // set the initial persistent state then initialize the RM
- MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1);
- MesosWorkerStore.Worker worker2 = MesosWorkerStore.Worker.newWorker(task2).launchWorker(slave1, slave1host);
- MesosWorkerStore.Worker worker3 = MesosWorkerStore.Worker.newWorker(task3).launchWorker(slave1, slave1host).releaseWorker();
- when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
- when(workerStore.recoverWorkers()).thenReturn(Arrays.asList(worker1, worker2, worker3));
- initialize();
-
- // verify that the internal state was updated, the task router was notified,
- // and the launch coordinator was asked to launch a task
- assertThat(resourceManagerInstance.workersInNew, hasEntry(extractResourceID(task1), worker1));
- assertThat(resourceManagerInstance.workersInLaunch, hasEntry(extractResourceID(task2), worker2));
- assertThat(resourceManagerInstance.workersBeingReturned, hasEntry(extractResourceID(task3), worker3));
- resourceManagerInstance.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class);
- LaunchCoordinator.Assign actualAssign =
- resourceManagerInstance.launchCoordinator.expectMsgClass(LaunchCoordinator.Assign.class);
- assertThat(actualAssign.tasks(), hasSize(1));
- assertThat(actualAssign.tasks().get(0).f0.getId(), equalTo(task2.getValue()));
- assertThat(actualAssign.tasks().get(0).f1, equalTo(slave1host));
- resourceManagerInstance.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class);
-
- register(Collections.<ResourceID>emptyList());
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- };
- }};
- }
-
- /**
- * Test re-acceptance of registered workers upon JM registration.
- */
- @Test
- public void testReacceptRegisteredWorkers() {
- new Context() {{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
- try {
- // set the initial persistent state then initialize the RM
- MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
- when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
- when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched));
- initialize();
-
- // send RegisterResourceManagerSuccessful to the RM with some 'known' workers.
- // This will cause the RM to reaccept the workers.
- assertThat(resourceManagerInstance.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));
- register(singletonList(extractResourceID(task1)));
- assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- };
- }};
- }
-
- /**
- * Test normal worker registration.
- */
- @Test
- public void testWorkerRegistered() {
- new Context() {{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
- try {
- // set the initial state with a (recovered) launched worker
- MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
- when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
- when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched));
- initialize();
- assertThat(resourceManagerInstance.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));
- register(Collections.<ResourceID>emptyList());
-
- // send registration message
- NotifyResourceStarted msg = new NotifyResourceStarted(extractResourceID(task1));
- resourceManager.tell(msg);
-
- // verify that the internal state was updated
- assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- };
- }};
- }
-
- /**
- * Test release of registered workers.
- */
- @Test
- public void testReleaseRegisteredWorker() {
- new Context() {{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
- try {
- // set the initial persistent state, initialize the RM, then register with task1 as a registered worker
- MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
- when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
- when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1));
- initialize();
- resourceManagerInstance.launchCoordinator.expectMsgClass(LaunchCoordinator.Assign.class);
- register(singletonList(extractResourceID(task1)));
-
- // release the registered worker
- resourceManager.tell(new RemoveResource(extractResourceID(task1)));
-
- // verify that the worker was persisted, the internal state was updated, the task router was notified,
- // and the launch coordinator was notified about the host assignment change
- MesosWorkerStore.Worker worker2Released = worker1.releaseWorker();
- verify(workerStore).putWorker(worker2Released);
- assertThat(resourceManagerInstance.workersBeingReturned, hasEntry(extractResourceID(task1), worker2Released));
- resourceManagerInstance.launchCoordinator.expectMsg(new LaunchCoordinator.Unassign(task1, slave1host));
-
- // send the subsequent terminated message
- when(workerStore.removeWorker(task1)).thenReturn(true);
- resourceManager.tell(new TaskMonitor.TaskTerminated(task1, Protos.TaskStatus.newBuilder()
- .setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_FINISHED).build()));
-
- // verify that the instance state was updated
- assertThat(resourceManagerInstance.workersBeingReturned.entrySet(), empty());
- verify(workerStore).removeWorker(task1);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- };
- }};
- }
-
- /**
- * Test request for new workers.
- */
- @Test
- public void testRequestNewWorkers() {
- new Context() {{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
- try {
- initialize();
- register(Collections.<ResourceID>emptyList());
-
- // set the target pool size
- when(workerStore.newTaskID()).thenReturn(task1).thenThrow(new AssertionFailedError());
- resourceManager.tell(new SetWorkerPoolSize(1), jobManager);
-
- // verify that a new worker was persisted, the internal state was updated, the task router was notified,
- // and the launch coordinator was asked to launch a task
- MesosWorkerStore.Worker expected = MesosWorkerStore.Worker.newWorker(task1);
- verify(workerStore).putWorker(expected);
- assertThat(resourceManagerInstance.workersInNew, hasEntry(extractResourceID(task1), expected));
- resourceManagerInstance.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class);
- resourceManagerInstance.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- };
- }};
- }
-
- /**
- * Test offer handling.
- */
- @Test
- public void testOfferHandling() {
- new Context() {{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
- initialize();
- register(Collections.<ResourceID>emptyList());
-
- // Verify that the RM forwards offers to the launch coordinator.
- resourceManager.tell(new ResourceOffers(Collections.<Protos.Offer>emptyList()));
- resourceManagerInstance.launchCoordinator.expectMsgClass(ResourceOffers.class);
- resourceManager.tell(new OfferRescinded(offer1));
- resourceManagerInstance.launchCoordinator.expectMsgClass(OfferRescinded.class);
- }
- };
- }};
- }
-
- /**
- * Test offer acceptance.
- */
- @Test
- public void testAcceptOffers() {
- new Context() {{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
- try {
- // set the initial persistent state with a new task then initialize the RM
- MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1);
- when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
- when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1));
- initialize();
- assertThat(resourceManagerInstance.workersInNew, hasEntry(extractResourceID(task1), worker1));
- resourceManagerInstance.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class);
- register(Collections.<ResourceID>emptyList());
-
- // send an AcceptOffers message as the LaunchCoordinator would
- // to launch task1 onto slave1 with offer1
- Protos.TaskInfo task1info = Protos.TaskInfo.newBuilder()
- .setTaskId(task1).setName("").setSlaveId(slave1).build();
- AcceptOffers msg = new AcceptOffers(slave1host, singletonList(offer1), singletonList(launch(task1info)));
- resourceManager.tell(msg);
-
- // verify that the worker was persisted, the internal state was updated,
- // Mesos was asked to launch task1, and the task router was notified
- MesosWorkerStore.Worker worker1launched = worker1.launchWorker(slave1, slave1host);
- verify(workerStore).putWorker(worker1launched);
- assertThat(resourceManagerInstance.workersInNew.entrySet(), empty());
- assertThat(resourceManagerInstance.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));
- resourceManagerInstance.taskRouter.expectMsg(
- new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker1launched)));
- verify(schedulerDriver).acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- };
- }};
- }
-
- /**
- * Test status handling.
- */
- @Test
- public void testStatusHandling() {
- new Context() {{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
- initialize();
- register(Collections.<ResourceID>emptyList());
-
- // Verify that the RM forwards status updates to the launch coordinator and task router.
- resourceManager.tell(new StatusUpdate(Protos.TaskStatus.newBuilder()
- .setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_LOST).build()),
- resourceManager);
- resourceManagerInstance.reconciliationCoordinator.expectMsgClass(StatusUpdate.class);
- resourceManagerInstance.taskRouter.expectMsgClass(StatusUpdate.class);
- }
- };
- }};
- }
-
- /**
- * Test unplanned task failure of a pending worker.
- */
- @Test
- public void testPendingWorkerFailed() {
- new Context() {{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
- try {
- // set the initial persistent state with a launched worker that hasn't yet registered
- MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
- when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
- when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched));
- initialize();
- register(Collections.<ResourceID>emptyList());
-
- // tell the RM that a task failed (and prepare a replacement task)
- when(workerStore.newTaskID()).thenReturn(task2);
- when(workerStore.removeWorker(task1)).thenReturn(true);
- resourceManager.tell(new SetWorkerPoolSize(1), jobManager);
- resourceManager.tell(new TaskMonitor.TaskTerminated(task1, Protos.TaskStatus.newBuilder()
- .setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_FAILED).build()));
-
- // verify that the instance state was updated
- assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
- verify(workerStore).newTaskID();
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- };
- }};
- }
-
- /**
- * Test unplanned task failure of a registered worker.
- */
- @Test
- public void testRegisteredWorkerFailed() {
- new Context() {{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
- try {
- // set the initial persistent state with a launched & registered worker
- MesosWorkerStore.Worker worker1launched = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
- when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
- when(workerStore.recoverWorkers()).thenReturn(singletonList(worker1launched));
- initialize();
- register(singletonList(extractResourceID(task1)));
-
- // tell the RM that a task failed (and prepare a replacement task)
- when(workerStore.newTaskID()).thenReturn(task2);
- when(workerStore.removeWorker(task1)).thenReturn(true);
- resourceManager.tell(new SetWorkerPoolSize(1), jobManager);
- resourceManager.tell(new TaskMonitor.TaskTerminated(task1, Protos.TaskStatus.newBuilder()
- .setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_FAILED).build()));
-
- // verify that the instance state was updated and a replacement was created
- assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
- expectMsgClass(ResourceRemoved.class);
- verify(workerStore).newTaskID();
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- };
- }};
- }
-
- /**
- * Test cluster stop handling.
- */
- @Test
- public void testStopApplication() {
- new Context() {{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
- try {
- initialize();
- register(Collections.<ResourceID>emptyList());
- watch(resourceManager.actor());
- resourceManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, ""), resourceManager);
-
- // verify that the Mesos framework is shutdown
- verify(schedulerDriver).stop(false);
- verify(workerStore).stop(true);
- expectTerminated(resourceManager.actor());
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- };
- }};
- }
-
- // ------------- connectivity tests -----------------------------
-
- /**
- * Test Mesos registration handling.
- */
- @Test
- public void testRegistered() {
- new Context() {{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
- try {
- initialize();
- register(Collections.<ResourceID>emptyList());
-
- Protos.MasterInfo masterInfo = Protos.MasterInfo.newBuilder()
- .setId("master1").setIp(0).setPort(5050).build();
- resourceManager.tell(new Registered(framework1, masterInfo), resourceManager);
-
- verify(workerStore).setFrameworkID(Option.apply(framework1));
- resourceManagerInstance.connectionMonitor.expectMsgClass(Registered.class);
- resourceManagerInstance.reconciliationCoordinator.expectMsgClass(Registered.class);
- resourceManagerInstance.launchCoordinator.expectMsgClass(Registered.class);
- resourceManagerInstance.taskRouter.expectMsgClass(Registered.class);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- };
- }};
- }
-
-
- /**
- * Test Mesos re-registration handling.
- */
- @Test
- public void testReRegistered() {
- new Context() {{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
- try {
- when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
- initialize();
- register(Collections.<ResourceID>emptyList());
-
- Protos.MasterInfo masterInfo = Protos.MasterInfo.newBuilder()
- .setId("master1").setIp(0).setPort(5050).build();
- resourceManager.tell(new ReRegistered(masterInfo), resourceManager);
-
- resourceManagerInstance.connectionMonitor.expectMsgClass(ReRegistered.class);
- resourceManagerInstance.reconciliationCoordinator.expectMsgClass(ReRegistered.class);
- resourceManagerInstance.launchCoordinator.expectMsgClass(ReRegistered.class);
- resourceManagerInstance.taskRouter.expectMsgClass(ReRegistered.class);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- };
- }};
- }
-
- /**
- * Test Mesos re-registration handling.
- */
- @Test
- public void testDisconnected() {
- new Context() {{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
- try {
- when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
- initialize();
- register(Collections.<ResourceID>emptyList());
-
- resourceManager.tell(new Disconnected(), resourceManager);
-
- resourceManagerInstance.connectionMonitor.expectMsgClass(Disconnected.class);
- resourceManagerInstance.reconciliationCoordinator.expectMsgClass(Disconnected.class);
- resourceManagerInstance.launchCoordinator.expectMsgClass(Disconnected.class);
- resourceManagerInstance.taskRouter.expectMsgClass(Disconnected.class);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- };
- }};
- }
-
- /**
- * Test Mesos scheduler error.
- */
- @Test
- public void testError() {
- new Context() {{
- new Within(duration("10 seconds")) {
- @Override
- protected void run() {
- try {
- when(workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
- initialize();
- register(Collections.<ResourceID>emptyList());
-
- watch(resourceManager.actor());
- resourceManager.tell(new Error("test"), resourceManager);
- expectTerminated(resourceManager.actor());
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- };
- }};
- }
-}
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 5c9aa46..c5d053c 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -106,8 +106,8 @@ import java.util.concurrent.TimeUnit;
import scala.Option;
import static java.util.Collections.singletonList;
-import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractGoalState;
-import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractResourceID;
+import static org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager.extractGoalState;
+import static org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager.extractResourceID;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;