You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/11/08 00:11:31 UTC
[3/3] incubator-aurora git commit: Simplify management of the driver
lifecycle using AbstractidleService.
Simplify management of the driver lifecycle using AbstractidleService.
Bugs closed: AURORA-920
Reviewed at https://reviews.apache.org/r/27746/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/66bd6fec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/66bd6fec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/66bd6fec
Branch: refs/heads/master
Commit: 66bd6fec9340eade40a61784ed464c3e94b9d784
Parents: 03cb0d1
Author: Bill Farner <wf...@apache.org>
Authored: Fri Nov 7 15:08:35 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Fri Nov 7 15:08:35 2014 -0800
----------------------------------------------------------------------
config/legacy_untested_classes.txt | 2 +
.../org/apache/aurora/scheduler/Driver.java | 157 ---------
.../apache/aurora/scheduler/DriverFactory.java | 185 -----------
.../aurora/scheduler/MesosSchedulerImpl.java | 252 --------------
.../aurora/scheduler/MesosTaskFactory.java | 156 ---------
.../aurora/scheduler/SchedulerLifecycle.java | 42 +--
.../aurora/scheduler/SchedulerModule.java | 16 -
.../apache/aurora/scheduler/app/AppModule.java | 2 +
.../aurora/scheduler/app/SchedulerMain.java | 11 +-
.../aurora/scheduler/async/AsyncModule.java | 2 +
.../scheduler/async/GcExecutorLauncher.java | 2 +-
.../scheduler/async/JobUpdateHistoryPruner.java | 1 -
.../aurora/scheduler/async/KillRetry.java | 2 +-
.../aurora/scheduler/async/OfferQueue.java | 2 +-
.../aurora/scheduler/async/TaskTimeout.java | 62 ++--
.../mesos/CommandLineDriverSettingsModule.java | 159 +++++++++
.../apache/aurora/scheduler/mesos/Driver.java | 57 ++++
.../aurora/scheduler/mesos/DriverFactory.java | 32 ++
.../scheduler/mesos/DriverFactoryImpl.java | 44 +++
.../aurora/scheduler/mesos/DriverSettings.java | 53 +++
.../scheduler/mesos/LibMesosLoadingModule.java | 26 ++
.../scheduler/mesos/MesosSchedulerImpl.java | 253 ++++++++++++++
.../scheduler/mesos/MesosTaskFactory.java | 158 +++++++++
.../scheduler/mesos/SchedulerDriverModule.java | 45 +++
.../scheduler/mesos/SchedulerDriverService.java | 141 ++++++++
.../scheduler/state/StateManagerImpl.java | 2 +-
.../aurora/scheduler/state/StateModule.java | 4 +-
.../aurora/scheduler/state/TaskAssigner.java | 2 +-
.../scheduler/storage/log/EntrySerializer.java | 1 +
.../storage/log/StreamManagerImpl.java | 1 +
.../aurora/scheduler/DriverFactoryImplTest.java | 65 ----
.../org/apache/aurora/scheduler/DriverTest.java | 99 ------
.../scheduler/MesosSchedulerImplTest.java | 330 ------------------
.../scheduler/MesosTaskFactoryImplTest.java | 104 ------
.../scheduler/SchedulerLifecycleTest.java | 32 +-
.../aurora/scheduler/app/SchedulerIT.java | 31 +-
.../aurora/scheduler/app/local/FakeMaster.java | 45 ++-
.../scheduler/app/local/LocalSchedulerMain.java | 32 +-
.../aurora/scheduler/async/AsyncModuleTest.java | 2 +-
.../scheduler/async/GcExecutorLauncherTest.java | 2 +-
.../aurora/scheduler/async/KillRetryTest.java | 2 +-
.../scheduler/async/OfferQueueImplTest.java | 2 +-
.../aurora/scheduler/async/TaskGroupsTest.java | 1 -
.../scheduler/async/TaskSchedulerTest.java | 2 +-
.../aurora/scheduler/async/TaskTimeoutTest.java | 46 ++-
.../CommandLineDriverSettingsModuleTest.java | 63 ++++
.../scheduler/mesos/MesosSchedulerImplTest.java | 331 +++++++++++++++++++
.../mesos/MesosTaskFactoryImplTest.java | 105 ++++++
.../mesos/SchedulerDriverServiceTest.java | 148 +++++++++
.../scheduler/quota/QuotaManagerImplTest.java | 1 -
.../scheduler/sla/MetricCalculatorTest.java | 1 -
.../scheduler/state/StateManagerImplTest.java | 2 +-
.../scheduler/state/TaskAssignerImplTest.java | 2 +-
.../aurora/scheduler/updater/JobUpdaterIT.java | 2 +-
54 files changed, 1805 insertions(+), 1517 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/config/legacy_untested_classes.txt
----------------------------------------------------------------------
diff --git a/config/legacy_untested_classes.txt b/config/legacy_untested_classes.txt
index f1d1921..33c1d6e 100644
--- a/config/legacy_untested_classes.txt
+++ b/config/legacy_untested_classes.txt
@@ -53,6 +53,8 @@ org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule
org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule$3
org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule$4
org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule$5
+org/apache/aurora/scheduler/mesos/DriverFactoryImpl
+org/apache/aurora/scheduler/mesos/LibMesosLoadingModule
org/apache/aurora/scheduler/state/MaintenanceController$MaintenanceControllerImpl$4
org/apache/aurora/scheduler/state/MaintenanceController$MaintenanceControllerImpl$8
org/apache/aurora/scheduler/stats/AsyncStatsModule$OfferAdapter$1
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/Driver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/Driver.java b/src/main/java/org/apache/aurora/scheduler/Driver.java
deleted file mode 100644
index b07378f..0000000
--- a/src/main/java/org/apache/aurora/scheduler/Driver.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler;
-
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.Atomics;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.StateMachine;
-
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.SchedulerDriver;
-
-import static org.apache.mesos.Protos.Status.DRIVER_RUNNING;
-
-/**
- * Wraps the mesos Scheduler driver to ensure its used in a valid lifecycle; namely:
- * <pre>
- * (run -> kill*)? -> stop*
- * </pre>
- *
- * Also ensures the driver is only asked for when needed.
- */
-public interface Driver {
-
- /**
- * Launches a task.
- *
- * @param offerId ID of the resource offer to accept with the task.
- * @param task Task to launch.
- */
- void launchTask(OfferID offerId, TaskInfo task);
-
- /**
- * Declines a resource offer.
- *
- * @param offerId ID of the offer to decline.
- */
- void declineOffer(OfferID offerId);
-
- /**
- * Sends a kill task request for the given {@code taskId} to the mesos master.
- *
- * @param taskId The id of the task to kill.
- */
- void killTask(String taskId);
-
- /**
- * Stops the underlying driver if it is running, otherwise does nothing.
- */
- void stop();
-
- interface SettableDriver extends Driver {
- void initialize(SchedulerDriver driver);
- }
-
- /**
- * Mesos driver implementation.
- */
- class DriverImpl implements SettableDriver {
- private static final Logger LOG = Logger.getLogger(Driver.class.getName());
-
- /**
- * Driver states.
- */
- enum State {
- INIT,
- RUNNING,
- STOPPED
- }
-
- private final StateMachine<State> stateMachine;
- private final AtomicReference<SchedulerDriver> driverRef = Atomics.newReference();
- private final AtomicLong killFailures = Stats.exportLong("scheduler_driver_kill_failures");
-
- /**
- * Creates a driver manager that will only ask for the underlying mesos driver when actually
- * needed.
- */
- @Inject
- DriverImpl() {
- this.stateMachine =
- StateMachine.<State>builder("scheduler_driver")
- .initialState(State.INIT)
- .addState(State.INIT, State.RUNNING, State.STOPPED)
- .addState(State.RUNNING, State.STOPPED)
- .logTransitions()
- .throwOnBadTransition(true)
- .build();
- }
-
- private synchronized SchedulerDriver get(State expected) {
- // TODO(William Farner): Formalize the failure case here by throwing a checked exception.
- stateMachine.checkState(expected);
- return Objects.requireNonNull(driverRef.get());
- }
-
- @Override
- public void launchTask(OfferID offerId, TaskInfo task) {
- get(State.RUNNING).launchTasks(ImmutableList.of(offerId), ImmutableList.of(task));
- }
-
- @Override
- public void declineOffer(OfferID offerId) {
- get(State.RUNNING).declineOffer(offerId);
- }
-
- @Override
- public void initialize(SchedulerDriver driver) {
- Objects.requireNonNull(driver);
- stateMachine.checkState(State.INIT);
- Preconditions.checkState(driverRef.compareAndSet(null, driver));
- stateMachine.transition(State.RUNNING);
- }
-
- @Override
- public synchronized void stop() {
- if (stateMachine.getState() == State.RUNNING) {
- SchedulerDriver driver = get(State.RUNNING);
- driver.stop(true /* failover */);
- stateMachine.transition(State.STOPPED);
- }
- }
-
- @Override
- public void killTask(String taskId) {
- SchedulerDriver driver = get(State.RUNNING);
- Protos.Status status = driver.killTask(Protos.TaskID.newBuilder().setValue(taskId).build());
-
- if (status != DRIVER_RUNNING) {
- LOG.severe(String.format("Attempt to kill task %s failed with code %s",
- taskId, status));
- killFailures.incrementAndGet();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/DriverFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/DriverFactory.java b/src/main/java/org/apache/aurora/scheduler/DriverFactory.java
deleted file mode 100644
index 9cc04a8..0000000
--- a/src/main/java/org/apache/aurora/scheduler/DriverFactory.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-import javax.inject.Inject;
-import javax.inject.Provider;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.protobuf.ByteString;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.args.constraints.NotNull;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-
-import org.apache.mesos.MesosSchedulerDriver;
-import org.apache.mesos.Protos.Credential;
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.FrameworkInfo;
-import org.apache.mesos.Scheduler;
-import org.apache.mesos.SchedulerDriver;
-
-/**
- * Factory to create scheduler driver instances.
- */
-public interface DriverFactory extends Function<String, SchedulerDriver> {
-
- class DriverFactoryImpl implements DriverFactory {
- private static final Logger LOG = Logger.getLogger(DriverFactory.class.getName());
-
- @NotNull
- @CmdLine(name = "mesos_master_address",
- help = "Address for the mesos master, can be a socket address or zookeeper path.")
- private static final Arg<String> MESOS_MASTER_ADDRESS = Arg.create();
-
- @VisibleForTesting
- static final String PRINCIPAL_KEY = "aurora_authentication_principal";
- @VisibleForTesting
- static final String SECRET_KEY = "aurora_authentication_secret";
- @CmdLine(name = "framework_authentication_file",
- help = "Properties file which contains framework credentials to authenticate with Mesos"
- + "master. Must contain the properties '" + PRINCIPAL_KEY + "' and "
- + "'" + SECRET_KEY + "'.")
- private static final Arg<File> FRAMEWORK_AUTHENTICATION_FILE = Arg.create();
-
- @CmdLine(name = "framework_failover_timeout",
- help = "Time after which a framework is considered deleted. SHOULD BE VERY HIGH.")
- private static final Arg<Amount<Long, Time>> FRAMEWORK_FAILOVER_TIMEOUT =
- Arg.create(Amount.of(21L, Time.DAYS));
-
- /**
- * Require Mesos slaves to have checkpointing enabled. Slaves with checkpointing enabled will
- * attempt to write checkpoints when required by a task's framework. These checkpoints allow
- * executors to be reattached rather than killed when a slave is restarted.
- *
- * This flag is dangerous! When enabled tasks will not launch on slaves without checkpointing
- * enabled.
- *
- * Behavior is as follows:
- * (Scheduler -require_slave_checkpoint=true, Slave --checkpoint=true):
- * Tasks will launch. Checkpoints will be written.
- * (Scheduler -require_slave_checkpoint=true, Slave --checkpoint=false):
- * Tasks WILL NOT launch.
- * (Scheduler -require_slave_checkpoint=false, Slave --checkpoint=true):
- * Tasks will launch. Checkpoints will not be written.
- * (Scheduler -require_slave_checkpoint=false, Slave --checkpoint=false):
- * Tasks will launch. Checkpoints will not be written.
- *
- * TODO(ksweeney): Remove warning table after https://issues.apache.org/jira/browse/MESOS-444
- * is resolved.
- */
- @CmdLine(name = "require_slave_checkpoint",
- help = "DANGEROUS! Require Mesos slaves to have checkpointing enabled. When enabled a "
- + "slave restart should not kill executors, but the scheduler will not be able to "
- + "launch tasks on slaves without --checkpoint=true in their command lines. See "
- + "DriverFactory.java for more information.")
- private static final Arg<Boolean> REQUIRE_SLAVE_CHECKPOINT = Arg.create(false);
-
- @CmdLine(name = "executor_user",
- help = "User to start the executor. Defaults to \"root\". "
- + "Set this to an unprivileged user if the mesos master was started with "
- + "\"--no-root_submissions\". If set to anything other than \"root\", the executor "
- + "will ignore the \"role\" setting for jobs since it can't use setuid() anymore. "
- + "This means that all your jobs will run under the specified user and the user has "
- + "to exist on the mesos slaves.")
- private static final Arg<String> EXECUTOR_USER = Arg.create("root");
-
- private static final String TWITTER_FRAMEWORK_NAME = "TwitterScheduler";
-
- private final Provider<Scheduler> scheduler;
-
- @Inject
- DriverFactoryImpl(Provider<Scheduler> scheduler) {
- this.scheduler = Objects.requireNonNull(scheduler);
- }
-
- @VisibleForTesting
- static Properties parseCredentials(InputStream credentialsStream) {
- Properties properties = new Properties();
- try {
- properties.load(credentialsStream);
- } catch (IOException e) {
- LOG.severe("Unable to load authentication file");
- throw Throwables.propagate(e);
- }
- Preconditions.checkState(properties.containsKey(PRINCIPAL_KEY),
- "The framework authentication file is missing the key: %s", PRINCIPAL_KEY);
- Preconditions.checkState(properties.containsKey(SECRET_KEY),
- "The framework authentication file is missing the key: %s", SECRET_KEY);
- return properties;
- }
-
- @Override
- public SchedulerDriver apply(@Nullable String frameworkId) {
- LOG.info("Connecting to mesos master: " + MESOS_MASTER_ADDRESS.get());
-
- FrameworkInfo.Builder frameworkInfo = FrameworkInfo.newBuilder()
- .setUser(EXECUTOR_USER.get())
- .setName(TWITTER_FRAMEWORK_NAME)
- .setCheckpoint(REQUIRE_SLAVE_CHECKPOINT.get())
- .setFailoverTimeout(FRAMEWORK_FAILOVER_TIMEOUT.get().as(Time.SECONDS));
-
- if (frameworkId == null) {
- LOG.warning("Did not find a persisted framework ID, connecting as a new framework.");
- } else {
- LOG.info("Found persisted framework ID: " + frameworkId);
- frameworkInfo.setId(FrameworkID.newBuilder().setValue(frameworkId));
- }
-
- if (FRAMEWORK_AUTHENTICATION_FILE.hasAppliedValue()) {
- Properties properties;
- try {
- properties = parseCredentials(new FileInputStream(FRAMEWORK_AUTHENTICATION_FILE.get()));
- } catch (FileNotFoundException e) {
- LOG.severe("Authentication File not Found");
- throw Throwables.propagate(e);
- }
-
- LOG.info(String.format("Connecting to master using authentication (principal: %s).",
- properties.get(PRINCIPAL_KEY)));
-
- Credential credential = Credential.newBuilder()
- .setPrincipal(properties.getProperty(PRINCIPAL_KEY))
- .setSecret(ByteString.copyFromUtf8(properties.getProperty(SECRET_KEY)))
- .build();
-
- return new MesosSchedulerDriver(
- scheduler.get(),
- frameworkInfo.build(),
- MESOS_MASTER_ADDRESS.get(),
- credential);
- } else {
- LOG.warning("Connecting to master without authentication!");
- return new MesosSchedulerDriver(
- scheduler.get(),
- frameworkInfo.build(),
- MESOS_MASTER_ADDRESS.get());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
deleted file mode 100644
index c424ecd..0000000
--- a/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-import javax.inject.Qualifier;
-
-import com.google.common.base.Preconditions;
-import com.twitter.common.application.Lifecycle;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.stats.Stats;
-
-import org.apache.aurora.GuiceUtils.AllowUnchecked;
-import org.apache.aurora.scheduler.base.Conversions;
-import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.mesos.Protos.ExecutorID;
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.MasterInfo;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskStatus;
-import org.apache.mesos.Scheduler;
-import org.apache.mesos.SchedulerDriver;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-import static java.util.Objects.requireNonNull;
-
-/**
- * Location for communication with mesos.
- */
-class MesosSchedulerImpl implements Scheduler {
- private static final Logger LOG = Logger.getLogger(MesosSchedulerImpl.class.getName());
-
- private final AtomicLong totalResourceOffers = Stats.exportLong("scheduler_resource_offers");
- private final AtomicLong totalFailedStatusUpdates = Stats.exportLong("scheduler_status_updates");
- private final AtomicLong totalFrameworkDisconnects =
- Stats.exportLong("scheduler_framework_disconnects");
- private final AtomicLong totalFrameworkReregisters =
- Stats.exportLong("scheduler_framework_reregisters");
- private final AtomicLong totalLostExecutors = Stats.exportLong("scheduler_lost_executors");
-
- private final List<TaskLauncher> taskLaunchers;
-
- private final Storage storage;
- private final Lifecycle lifecycle;
- private final EventSink eventSink;
- private final Executor executor;
- private volatile boolean isRegistered = false;
-
- /**
- * Binding annotation for the executor the incoming Mesos message handler uses.
- */
- @Qualifier
- @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
- @interface SchedulerExecutor { }
-
- /**
- * Creates a new scheduler.
- *
- * @param storage Store to save host attributes into.
- * @param lifecycle Application lifecycle manager.
- * @param taskLaunchers Task launchers, which will be used in order. Calls to
- * {@link TaskLauncher#willUse(Offer)} and
- * {@link TaskLauncher#statusUpdate(TaskStatus)} are propagated to provided
- * launchers, ceasing after the first match (based on a return value of
- * {@code true}.
- * @param eventSink Pubsub sink to send driver status changes to.
- * @param executor Executor for async work
- */
- @Inject
- public MesosSchedulerImpl(
- Storage storage,
- final Lifecycle lifecycle,
- List<TaskLauncher> taskLaunchers,
- EventSink eventSink,
- @SchedulerExecutor Executor executor) {
-
- this.storage = requireNonNull(storage);
- this.lifecycle = requireNonNull(lifecycle);
- this.taskLaunchers = requireNonNull(taskLaunchers);
- this.eventSink = requireNonNull(eventSink);
- this.executor = requireNonNull(executor);
- }
-
- @Override
- public void slaveLost(SchedulerDriver schedulerDriver, SlaveID slaveId) {
- LOG.info("Received notification of lost slave: " + slaveId);
- }
-
- @Override
- public void registered(
- SchedulerDriver driver,
- final FrameworkID frameworkId,
- MasterInfo masterInfo) {
-
- LOG.info("Registered with ID " + frameworkId + ", master: " + masterInfo);
-
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override
- protected void execute(MutableStoreProvider storeProvider) {
- storeProvider.getSchedulerStore().saveFrameworkId(frameworkId.getValue());
- }
- });
- isRegistered = true;
- eventSink.post(new DriverRegistered());
- }
-
- @Override
- public void disconnected(SchedulerDriver schedulerDriver) {
- LOG.warning("Framework disconnected.");
- totalFrameworkDisconnects.incrementAndGet();
- eventSink.post(new DriverDisconnected());
- }
-
- @Override
- public void reregistered(SchedulerDriver schedulerDriver, MasterInfo masterInfo) {
- LOG.info("Framework re-registered with master " + masterInfo);
- totalFrameworkReregisters.incrementAndGet();
- }
-
- @Timed("scheduler_resource_offers")
- @Override
- public void resourceOffers(SchedulerDriver driver, final List<Offer> offers) {
- Preconditions.checkState(isRegistered, "Must be registered before receiving offers.");
-
- // Store all host attributes in a single write operation to prevent other threads from
- // securing the storage lock between saves. We also save the host attributes before passing
- // offers elsewhere to ensure that host attributes are available before attempting to
- // schedule tasks associated with offers.
- // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over
- // offers when the host attributes cannot be found. (AURORA-137)
-
- executor.execute(new Runnable() {
- @Override
- public void run() {
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override
- protected void execute(MutableStoreProvider storeProvider) {
- for (final Offer offer : offers) {
- storeProvider.getAttributeStore()
- .saveHostAttributes(Conversions.getAttributes(offer));
- }
- }
- });
-
- for (Offer offer : offers) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, String.format("Received offer: %s", offer));
- }
- totalResourceOffers.incrementAndGet();
- for (TaskLauncher launcher : taskLaunchers) {
- if (launcher.willUse(offer)) {
- break;
- }
- }
- }
- }
- });
- }
-
- @Override
- public void offerRescinded(SchedulerDriver schedulerDriver, OfferID offerId) {
- LOG.info("Offer rescinded: " + offerId);
- for (TaskLauncher launcher : taskLaunchers) {
- launcher.cancelOffer(offerId);
- }
- }
-
- @AllowUnchecked
- @Timed("scheduler_status_update")
- @Override
- public void statusUpdate(SchedulerDriver driver, TaskStatus status) {
- String info = status.hasData() ? status.getData().toStringUtf8() : null;
- String infoMsg = info == null ? "" : " with info " + info;
- String coreMsg = status.hasMessage() ? " with core message " + status.getMessage() : "";
- LOG.info("Received status update for task " + status.getTaskId().getValue()
- + " in state " + status.getState() + infoMsg + coreMsg);
-
- try {
- for (TaskLauncher launcher : taskLaunchers) {
- if (launcher.statusUpdate(status)) {
- return;
- }
- }
- } catch (SchedulerException e) {
- LOG.log(Level.SEVERE, "Status update failed due to scheduler exception: " + e, e);
- // We re-throw the exception here in an effort to NACK the status update and trigger an
- // abort of the driver. Previously we directly issued driver.abort(), but the re-entrancy
- // guarantees of that are uncertain (and we believe it was not working). However, this
- // was difficult to discern since logging is unreliable during JVM shutdown and we would not
- // see the above log message.
- throw e;
- }
-
- LOG.warning("Unhandled status update " + status);
- totalFailedStatusUpdates.incrementAndGet();
- }
-
- @Override
- public void error(SchedulerDriver driver, String message) {
- LOG.severe("Received error message: " + message);
- lifecycle.shutdown();
- }
-
- @Override
- public void executorLost(SchedulerDriver schedulerDriver, ExecutorID executorID, SlaveID slaveID,
- int status) {
-
- LOG.info("Lost executor " + executorID);
- totalLostExecutors.incrementAndGet();
- }
-
- @Timed("scheduler_framework_message")
- @Override
- public void frameworkMessage(
- SchedulerDriver driver,
- ExecutorID executorID,
- SlaveID slave,
- byte[] data) {
-
- LOG.warning("Ignoring framework message.");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
deleted file mode 100644
index ad07bca..0000000
--- a/src/main/java/org/apache/aurora/scheduler/MesosTaskFactory.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler;
-
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.protobuf.ByteString;
-import com.twitter.common.quantity.Data;
-
-import org.apache.aurora.Protobufs;
-import org.apache.aurora.codec.ThriftBinaryCodec;
-import org.apache.aurora.scheduler.base.CommandUtil;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.mesos.Protos.ExecutorID;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.Resource;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-
-import static java.util.Objects.requireNonNull;
-
-import static com.twitter.common.base.MorePreconditions.checkNotBlank;
-
-/**
- * A factory to create mesos task objects.
- */
-public interface MesosTaskFactory {
-
- /**
- * Creates a mesos task object.
- *
- * @param task Assigned task to translate into a task object.
- * @param slaveId Id of the slave the task is being assigned to.
- * @return A new task.
- * @throws SchedulerException If the task could not be encoded.
- */
- TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException;
-
- class ExecutorConfig {
- private final String executorPath;
-
- public ExecutorConfig(String executorPath) {
- this.executorPath = checkNotBlank(executorPath);
- }
-
- String getExecutorPath() {
- return executorPath;
- }
- }
-
- class MesosTaskFactoryImpl implements MesosTaskFactory {
- private static final Logger LOG = Logger.getLogger(MesosTaskFactoryImpl.class.getName());
- private static final String EXECUTOR_PREFIX = "thermos-";
-
- /**
- * Name to associate with task executors.
- */
- @VisibleForTesting
- static final String EXECUTOR_NAME = "aurora.task";
-
- private final String executorPath;
-
- @Inject
- MesosTaskFactoryImpl(ExecutorConfig executorConfig) {
- this.executorPath = executorConfig.getExecutorPath();
- }
-
- @VisibleForTesting
- static ExecutorID getExecutorId(String taskId) {
- return ExecutorID.newBuilder().setValue(EXECUTOR_PREFIX + taskId).build();
- }
-
- public static String getJobSourceName(IJobKey jobkey) {
- return String.format("%s.%s.%s", jobkey.getRole(), jobkey.getEnvironment(), jobkey.getName());
- }
-
- public static String getJobSourceName(ITaskConfig task) {
- return getJobSourceName(task.getJob());
- }
-
- public static String getInstanceSourceName(ITaskConfig task, int instanceId) {
- return String.format("%s.%s", getJobSourceName(task), instanceId);
- }
-
- @Override
- public TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException {
- requireNonNull(task);
- byte[] taskInBytes;
- try {
- taskInBytes = ThriftBinaryCodec.encode(task.newBuilder());
- } catch (ThriftBinaryCodec.CodingException e) {
- LOG.log(Level.SEVERE, "Unable to serialize task.", e);
- throw new SchedulerException("Internal error.", e);
- }
-
- ITaskConfig config = task.getTask();
- // TODO(wfarner): Re-evaluate if/why we need to continue handling unset assignedPorts field.
- List<Resource> resources = Resources.from(config)
- .toResourceList(task.isSetAssignedPorts()
- ? ImmutableSet.copyOf(task.getAssignedPorts().values())
- : ImmutableSet.<Integer>of());
-
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine("Setting task resources to "
- + Iterables.transform(resources, Protobufs.SHORT_TOSTRING));
- }
- TaskInfo.Builder taskBuilder =
- TaskInfo.newBuilder()
- .setName(JobKeys.canonicalString(Tasks.ASSIGNED_TO_JOB_KEY.apply(task)))
- .setTaskId(TaskID.newBuilder().setValue(task.getTaskId()))
- .setSlaveId(slaveId)
- .addAllResources(resources)
- .setData(ByteString.copyFrom(taskInBytes));
-
- ExecutorInfo executor = ExecutorInfo.newBuilder()
- .setCommand(CommandUtil.create(executorPath))
- .setExecutorId(getExecutorId(task.getTaskId()))
- .setName(EXECUTOR_NAME)
- .setSource(getInstanceSourceName(config, task.getInstanceId()))
- .addResources(
- Resources.makeMesosResource(Resources.CPUS, ResourceSlot.EXECUTOR_CPUS.get()))
- .addResources(Resources.makeMesosResource(
- Resources.RAM_MB,
- ResourceSlot.EXECUTOR_RAM.get().as(Data.MB)))
- .build();
- return taskBuilder
- .setExecutor(executor)
- .build();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
index 453f22a..e741913 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
@@ -30,7 +30,6 @@ import javax.inject.Inject;
import javax.inject.Qualifier;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
@@ -55,18 +54,14 @@ import com.twitter.common.zookeeper.ServerSet;
import com.twitter.common.zookeeper.SingletonService.LeaderControl;
import org.apache.aurora.GuavaUtils.ServiceManagerIface;
-import org.apache.aurora.scheduler.Driver.SettableDriver;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.Work;
import org.apache.aurora.scheduler.storage.StorageBackfill;
-import org.apache.mesos.Protos;
-import org.apache.mesos.SchedulerDriver;
import static java.util.Objects.requireNonNull;
@@ -125,18 +120,11 @@ public class SchedulerLifecycle implements EventSubscriber {
private final AtomicReference<LeaderControl> leaderControl = Atomics.newReference();
private final StateMachine<State> stateMachine;
- // The local driver reference, distinct from the global SettableDriver.
- // This is used to perform actions with the driver (i.e. invoke start(), join()),
- // which no other code should do. It also permits us to save the reference until we are ready to
- // make the driver ready by invoking SettableDriver.initialize().
- private final AtomicReference<SchedulerDriver> driverRef = Atomics.newReference();
-
@Inject
SchedulerLifecycle(
- DriverFactory driverFactory,
NonVolatileStorage storage,
Lifecycle lifecycle,
- SettableDriver driver,
+ Driver driver,
LeadingOptions leadingOptions,
ScheduledExecutorService executorService,
Clock clock,
@@ -146,7 +134,6 @@ public class SchedulerLifecycle implements EventSubscriber {
@SchedulerActive ServiceManagerIface schedulerActiveServiceManager) {
this(
- driverFactory,
storage,
lifecycle,
driver,
@@ -209,10 +196,9 @@ public class SchedulerLifecycle implements EventSubscriber {
@VisibleForTesting
SchedulerLifecycle(
- final DriverFactory driverFactory,
final NonVolatileStorage storage,
final Lifecycle lifecycle,
- final SettableDriver driver,
+ final Driver driver,
final DelayedActions delayedActions,
final Clock clock,
final EventSink eventSink,
@@ -220,7 +206,6 @@ public class SchedulerLifecycle implements EventSubscriber {
StatsProvider statsProvider,
final ServiceManagerIface schedulerActiveServiceManager) {
- requireNonNull(driverFactory);
requireNonNull(storage);
requireNonNull(lifecycle);
requireNonNull(driver);
@@ -276,17 +261,7 @@ public class SchedulerLifecycle implements EventSubscriber {
}
});
- final Optional<String> frameworkId = storage.consistentRead(
- new Work.Quiet<Optional<String>>() {
- @Override
- public Optional<String> apply(StoreProvider storeProvider) {
- return storeProvider.getSchedulerStore().fetchFrameworkId();
- }
- });
-
- // Save the prepared driver locally, but don't expose it until the registered callback is
- // received.
- driverRef.set(driverFactory.apply(frameworkId.orNull()));
+ driver.startAsync().awaitRunning();
delayedActions.onRegistrationTimeout(
new Runnable() {
@@ -308,9 +283,6 @@ public class SchedulerLifecycle implements EventSubscriber {
stateMachine.transition(State.DEAD);
}
});
-
- Protos.Status status = driverRef.get().start();
- LOG.info("Driver started with code " + status);
}
};
@@ -318,12 +290,10 @@ public class SchedulerLifecycle implements EventSubscriber {
@Override
public void execute(Transition<State> transition) {
registrationAcked.set(true);
- driver.initialize(driverRef.get());
delayedActions.blockingDriverJoin(new Runnable() {
@Override
public void run() {
- // Blocks until driver exits.
- driverRef.get().join();
+ driver.blockUntilStopped();
LOG.info("Driver exited, terminating lifecycle.");
stateMachine.transition(State.DEAD);
}
@@ -365,7 +335,7 @@ public class SchedulerLifecycle implements EventSubscriber {
// TODO(wfarner): Re-evaluate tear-down ordering here. Should the top-level shutdown
// be invoked first, or the underlying critical components?
- driver.stop();
+ driver.stopAsync().awaitTerminated();
storage.stop();
} finally {
lifecycle.shutdown();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
index 8d30640..72d3d60 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Logger;
@@ -37,15 +36,12 @@ import com.twitter.common.quantity.Time;
import org.apache.aurora.GuavaUtils;
import org.apache.aurora.GuavaUtils.ServiceManagerIface;
-import org.apache.aurora.scheduler.Driver.DriverImpl;
-import org.apache.aurora.scheduler.Driver.SettableDriver;
import org.apache.aurora.scheduler.SchedulerLifecycle.LeadingOptions;
import org.apache.aurora.scheduler.SchedulerLifecycle.SchedulerActive;
import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
import org.apache.aurora.scheduler.async.GcExecutorLauncher;
import org.apache.aurora.scheduler.base.AsyncUtil;
import org.apache.aurora.scheduler.events.PubsubEventModule;
-import org.apache.mesos.Scheduler;
/**
* Binding module for top-level scheduling logic.
@@ -66,21 +62,9 @@ public class SchedulerModule extends AbstractModule {
@Override
protected void configure() {
- bind(Driver.class).to(DriverImpl.class);
- bind(SettableDriver.class).to(DriverImpl.class);
- bind(DriverImpl.class).in(Singleton.class);
-
- bind(Scheduler.class).to(MesosSchedulerImpl.class);
- bind(MesosSchedulerImpl.class).in(Singleton.class);
-
bind(TaskIdGenerator.class).to(TaskIdGeneratorImpl.class);
-
bind(UserTaskLauncher.class).in(Singleton.class);
- // TODO(zmanji): Create singleThreadedExecutor (non-scheduled) variant.
- bind(Executor.class).annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class)
- .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("SchedulerImpl-%d", LOG));
-
install(new PrivateModule() {
@Override
protected void configure() {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
index 9ec3f41..fef76f5 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -49,6 +49,7 @@ import org.apache.aurora.scheduler.async.AsyncModule;
import org.apache.aurora.scheduler.events.PubsubEventModule;
import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
import org.apache.aurora.scheduler.http.JettyServerModule;
+import org.apache.aurora.scheduler.mesos.SchedulerDriverModule;
import org.apache.aurora.scheduler.metadata.MetadataModule;
import org.apache.aurora.scheduler.quota.QuotaModule;
import org.apache.aurora.scheduler.sla.SlaModule;
@@ -115,6 +116,7 @@ class AppModule extends AbstractModule {
install(new MetadataModule());
install(new QuotaModule());
install(new JettyServerModule());
+ install(new SchedulerDriverModule());
install(new SchedulerModule());
install(new StateModule());
install(new SlaModule());
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index 0487409..288e7cb 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -17,7 +17,6 @@ import java.net.InetSocketAddress;
import java.util.List;
import javax.inject.Inject;
-import javax.inject.Singleton;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
@@ -46,12 +45,12 @@ import com.twitter.common.zookeeper.guice.client.flagged.FlaggedClientConfig;
import org.apache.aurora.auth.CapabilityValidator;
import org.apache.aurora.auth.SessionValidator;
import org.apache.aurora.auth.UnsecureAuthModule;
-import org.apache.aurora.scheduler.DriverFactory;
-import org.apache.aurora.scheduler.DriverFactory.DriverFactoryImpl;
-import org.apache.aurora.scheduler.MesosTaskFactory.ExecutorConfig;
import org.apache.aurora.scheduler.SchedulerLifecycle;
import org.apache.aurora.scheduler.cron.quartz.CronModule;
import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule;
+import org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule;
+import org.apache.aurora.scheduler.mesos.LibMesosLoadingModule;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory.ExecutorConfig;
import org.apache.aurora.scheduler.storage.backup.BackupModule;
import org.apache.aurora.scheduler.storage.db.DbModule;
import org.apache.aurora.scheduler.storage.db.MigrationModule;
@@ -157,8 +156,8 @@ public class SchedulerMain extends AbstractApplication {
return new AbstractModule() {
@Override
protected void configure() {
- bind(DriverFactory.class).to(DriverFactoryImpl.class);
- bind(DriverFactoryImpl.class).in(Singleton.class);
+ install(new CommandLineDriverSettingsModule());
+ install(new LibMesosLoadingModule());
install(new MesosLogStreamModule(zkClientConfig));
}
};
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index 9bc42d3..4e37f4c 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -46,6 +46,7 @@ import com.twitter.common.util.BackoffStrategy;
import com.twitter.common.util.Random;
import com.twitter.common.util.TruncatedBinaryBackoff;
+import org.apache.aurora.scheduler.SchedulerModule;
import org.apache.aurora.scheduler.async.GcExecutorLauncher.GcExecutorSettings;
import org.apache.aurora.scheduler.async.GcExecutorLauncher.RandomGcExecutorSettings;
import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
@@ -254,6 +255,7 @@ public class AsyncModule extends AbstractModule {
}
});
PubsubEventModule.bindSubscriber(binder(), TaskTimeout.class);
+ SchedulerModule.addSchedulerActiveServiceBinding(binder()).to(TaskTimeout.class);
install(new PrivateModule() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
index feb70a9..79d8d8d 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
@@ -41,12 +41,12 @@ import org.apache.aurora.Protobufs;
import org.apache.aurora.codec.ThriftBinaryCodec;
import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
import org.apache.aurora.gen.comm.AdjustRetainedTasks;
-import org.apache.aurora.scheduler.Driver;
import org.apache.aurora.scheduler.TaskLauncher;
import org.apache.aurora.scheduler.base.CommandUtil;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.mesos.Protos;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
index 0b023a2..8e9a1dc 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
@@ -21,7 +21,6 @@ import java.util.logging.Logger;
import javax.inject.Inject;
import com.google.common.base.Joiner;
-
import com.twitter.common.base.Command;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java b/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
index e656c7e..d6b7fab 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java
@@ -26,10 +26,10 @@ import com.twitter.common.stats.StatsProvider;
import com.twitter.common.util.BackoffStrategy;
import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.Driver;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.storage.Storage;
import static java.util.Objects.requireNonNull;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
index 1a45d08..14c3e3a 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
@@ -37,10 +37,10 @@ import com.twitter.common.quantity.Time;
import com.twitter.common.stats.Stats;
import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.scheduler.Driver;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.events.PubsubEvent.HostMaintenanceStateChange;
+import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.state.MaintenanceController;
import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.OfferID;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
index 8914022..8217c51 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
@@ -16,7 +16,6 @@ package org.apache.aurora.scheduler.async;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
@@ -25,6 +24,7 @@ import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractIdleService;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.stats.StatsProvider;
@@ -40,10 +40,13 @@ import static java.util.Objects.requireNonNull;
* Observes task transitions and identifies tasks that are 'stuck' in a transient state. Stuck
* tasks will be transitioned to the LOST state.
*/
-class TaskTimeout implements EventSubscriber {
+class TaskTimeout extends AbstractIdleService implements EventSubscriber {
private static final Logger LOG = Logger.getLogger(TaskTimeout.class.getName());
@VisibleForTesting
+ static final Amount<Long, Time> NOT_STARTED_RETRY = Amount.of(5L, Time.SECONDS);
+
+ @VisibleForTesting
static final String TIMED_OUT_TASKS_COUNTER = "timed_out_tasks";
@VisibleForTesting
@@ -59,7 +62,7 @@ class TaskTimeout implements EventSubscriber {
private final ScheduledExecutorService executor;
private final StateManager stateManager;
- private final long timeoutMillis;
+ private final Amount<Long, Time> timeout;
private final AtomicLong timedOutTasks;
@Inject
@@ -71,7 +74,7 @@ class TaskTimeout implements EventSubscriber {
this.executor = requireNonNull(executor);
this.stateManager = requireNonNull(stateManager);
- this.timeoutMillis = timeout.as(Time.MILLISECONDS);
+ this.timeout = requireNonNull(timeout);
this.timedOutTasks = statsProvider.makeCounter(TIMED_OUT_TASKS_COUNTER);
}
@@ -79,6 +82,17 @@ class TaskTimeout implements EventSubscriber {
return TRANSIENT_STATES.contains(status);
}
+ @Override
+ protected void startUp() {
+ // No work to do here for startup, however we leverage the state tracking in
+ // AbstractIdleService.
+ }
+
+ @Override
+ protected void shutDown() {
+ // Nothing to do for shutting down.
+ }
+
@Subscribe
public void recordStateChange(TaskStateChange change) {
final String taskId = change.getTaskId();
@@ -88,24 +102,34 @@ class TaskTimeout implements EventSubscriber {
new Runnable() {
@Override
public void run() {
- // This query acts as a CAS by including the state that we expect the task to be in if
- // the timeout is still valid. Ideally, the future would have already been canceled,
- // but in the event of a state transition race, including transientState prevents an
- // unintended task timeout.
- // Note: This requires LOST transitions trigger Driver.killTask.
- if (stateManager.changeState(
- taskId,
- Optional.of(newState),
- ScheduleStatus.LOST,
- TIMEOUT_MESSAGE)) {
-
- LOG.info("Timeout reached for task " + taskId + ":" + taskId);
- timedOutTasks.incrementAndGet();
+ if (isRunning()) {
+ // This query acts as a CAS by including the state that we expect the task to be in
+ // if the timeout is still valid. Ideally, the future would have already been
+ // canceled, but in the event of a state transition race, including transientState
+ // prevents an unintended task timeout.
+ // Note: This requires LOST transitions trigger Driver.killTask.
+ if (stateManager.changeState(
+ taskId,
+ Optional.of(newState),
+ ScheduleStatus.LOST,
+ TIMEOUT_MESSAGE)) {
+
+ LOG.info("Timeout reached for task " + taskId + ":" + taskId);
+ timedOutTasks.incrementAndGet();
+ }
+ } else {
+ // Our service is not yet started. We don't want to lose track of the task, so
+ // we will try again later.
+ LOG.fine("Retrying timeout of task " + taskId + " in " + NOT_STARTED_RETRY);
+ executor.schedule(
+ this,
+ NOT_STARTED_RETRY.getValue(),
+ NOT_STARTED_RETRY.getUnit().getTimeUnit());
}
}
},
- timeoutMillis,
- TimeUnit.MILLISECONDS);
+ timeout.getValue(),
+ timeout.getUnit().getTimeUnit());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java
new file mode 100644
index 0000000..d099420
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.inject.AbstractModule;
+import com.google.protobuf.ByteString;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.args.constraints.NotNull;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+import org.apache.mesos.Protos;
+
+import static org.apache.mesos.Protos.FrameworkInfo;
+
+/**
+ * Creates and binds {@link DriverSettings} based on values found on the command line.
+ */
+public class CommandLineDriverSettingsModule extends AbstractModule {
+
+ private static final Logger LOG =
+ Logger.getLogger(CommandLineDriverSettingsModule.class.getName());
+
+ @NotNull
+ @CmdLine(name = "mesos_master_address",
+ help = "Address for the mesos master, can be a socket address or zookeeper path.")
+ private static final Arg<String> MESOS_MASTER_ADDRESS = Arg.create();
+
+ @VisibleForTesting
+ static final String PRINCIPAL_KEY = "aurora_authentication_principal";
+ @VisibleForTesting
+ static final String SECRET_KEY = "aurora_authentication_secret";
+
+ @CmdLine(name = "framework_authentication_file",
+ help = "Properties file which contains framework credentials to authenticate with Mesos"
+ + "master. Must contain the properties '" + PRINCIPAL_KEY + "' and "
+ + "'" + SECRET_KEY + "'.")
+ private static final Arg<File> FRAMEWORK_AUTHENTICATION_FILE = Arg.create();
+
+ @CmdLine(name = "framework_failover_timeout",
+ help = "Time after which a framework is considered deleted. SHOULD BE VERY HIGH.")
+ private static final Arg<Amount<Long, Time>> FRAMEWORK_FAILOVER_TIMEOUT =
+ Arg.create(Amount.of(21L, Time.DAYS));
+
+ /**
+ * Require Mesos slaves to have checkpointing enabled. Slaves with checkpointing enabled will
+ * attempt to write checkpoints when required by a task's framework. These checkpoints allow
+ * executors to be reattached rather than killed when a slave is restarted.
+ *
+ * This flag is dangerous! When enabled tasks will not launch on slaves without checkpointing
+ * enabled.
+ *
+ * Behavior is as follows:
+ * (Scheduler -require_slave_checkpoint=true, Slave --checkpoint=true):
+ * Tasks will launch. Checkpoints will be written.
+ * (Scheduler -require_slave_checkpoint=true, Slave --checkpoint=false):
+ * Tasks WILL NOT launch.
+ * (Scheduler -require_slave_checkpoint=false, Slave --checkpoint=true):
+ * Tasks will launch. Checkpoints will not be written.
+ * (Scheduler -require_slave_checkpoint=false, Slave --checkpoint=false):
+ * Tasks will launch. Checkpoints will not be written.
+ *
+ * TODO(ksweeney): Remove warning table after https://issues.apache.org/jira/browse/MESOS-444
+ * is resolved.
+ */
+ @CmdLine(name = "require_slave_checkpoint",
+ help = "DANGEROUS! Require Mesos slaves to have checkpointing enabled. When enabled a "
+ + "slave restart should not kill executors, but the scheduler will not be able to "
+ + "launch tasks on slaves without --checkpoint=true in their command lines. See "
+ + "DriverFactory.java for more information.")
+ private static final Arg<Boolean> REQUIRE_SLAVE_CHECKPOINT = Arg.create(false);
+
+ @CmdLine(name = "executor_user",
+ help = "User to start the executor. Defaults to \"root\". "
+ + "Set this to an unprivileged user if the mesos master was started with "
+ + "\"--no-root_submissions\". If set to anything other than \"root\", the executor "
+ + "will ignore the \"role\" setting for jobs since it can't use setuid() anymore. "
+ + "This means that all your jobs will run under the specified user and the user has "
+ + "to exist on the mesos slaves.")
+ private static final Arg<String> EXECUTOR_USER = Arg.create("root");
+
+ // TODO(wfarner): Figure out a way to change this without risk of fallout (MESOS-703).
+ private static final String TWITTER_FRAMEWORK_NAME = "TwitterScheduler";
+
+ @Override
+ protected void configure() {
+ FrameworkInfo frameworkInfo = FrameworkInfo.newBuilder()
+ .setUser(EXECUTOR_USER.get())
+ .setName(TWITTER_FRAMEWORK_NAME)
+ .setCheckpoint(REQUIRE_SLAVE_CHECKPOINT.get())
+ .setFailoverTimeout(FRAMEWORK_FAILOVER_TIMEOUT.get().as(Time.SECONDS))
+ .build();
+ DriverSettings settings =
+ new DriverSettings(MESOS_MASTER_ADDRESS.get(), getCredentials(), frameworkInfo);
+ bind(DriverSettings.class).toInstance(settings);
+ }
+
+ private static Optional<Protos.Credential> getCredentials() {
+ if (FRAMEWORK_AUTHENTICATION_FILE.hasAppliedValue()) {
+ Properties properties;
+ try {
+ properties = parseCredentials(new FileInputStream(FRAMEWORK_AUTHENTICATION_FILE.get()));
+ } catch (FileNotFoundException e) {
+ LOG.severe("Authentication File not Found");
+ throw Throwables.propagate(e);
+ }
+
+ LOG.info(String.format("Connecting to master using authentication (principal: %s).",
+ properties.get(PRINCIPAL_KEY)));
+
+ return Optional.of(Protos.Credential.newBuilder()
+ .setPrincipal(properties.getProperty(PRINCIPAL_KEY))
+ .setSecret(ByteString.copyFromUtf8(properties.getProperty(SECRET_KEY)))
+ .build());
+ } else {
+ return Optional.absent();
+ }
+ }
+
+ @VisibleForTesting
+ static Properties parseCredentials(InputStream credentialsStream) {
+ Properties properties = new Properties();
+ try {
+ properties.load(credentialsStream);
+ } catch (IOException e) {
+ LOG.severe("Unable to load authentication file");
+ throw Throwables.propagate(e);
+ }
+ Preconditions.checkState(properties.containsKey(PRINCIPAL_KEY),
+ "The framework authentication file is missing the key: %s", PRINCIPAL_KEY);
+ Preconditions.checkState(properties.containsKey(SECRET_KEY),
+ "The framework authentication file is missing the key: %s", SECRET_KEY);
+ return properties;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java b/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java
new file mode 100644
index 0000000..c7e45a8
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import com.google.common.util.concurrent.Service;
+
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.TaskInfo;
+
+/**
+ * Wraps the mesos Scheduler driver to ensure its used in a valid lifecycle; namely:
+ * <pre>
+ * (run -> kill*)? -> stop*
+ * </pre>
+ *
+ * Also ensures the driver is only asked for when needed.
+ */
+public interface Driver extends Service {
+
+ /**
+ * Launches a task.
+ *
+ * @param offerId ID of the resource offer to accept with the task.
+ * @param task Task to launch.
+ */
+ void launchTask(OfferID offerId, TaskInfo task);
+
+ /**
+ * Declines a resource offer.
+ *
+ * @param offerId ID of the offer to decline.
+ */
+ void declineOffer(OfferID offerId);
+
+ /**
+ * Sends a kill task request for the given {@code taskId} to the mesos master.
+ *
+ * @param taskId The id of the task to kill.
+ */
+ void killTask(String taskId);
+
+ /**
+ * Blocks until the driver is no longer active.
+ */
+ void blockUntilStopped();
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/mesos/DriverFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/DriverFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/DriverFactory.java
new file mode 100644
index 0000000..92d8924
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/DriverFactory.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import com.google.common.base.Optional;
+
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+/**
+ * A layer over the constructor for {@link org.apache.mesos.MesosSchedulerDriver}. This is needed
+ * since {@link org.apache.mesos.MesosSchedulerDriver} statically loads libmesos.
+ */
+public interface DriverFactory {
+ SchedulerDriver create(
+ Scheduler scheduler,
+ Optional<Protos.Credential> credentials,
+ Protos.FrameworkInfo frameworkInfo,
+ String master);
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/mesos/DriverFactoryImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/DriverFactoryImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/DriverFactoryImpl.java
new file mode 100644
index 0000000..db7aa74
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/DriverFactoryImpl.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import com.google.common.base.Optional;
+
+import org.apache.mesos.MesosSchedulerDriver;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import static org.apache.mesos.Protos.FrameworkInfo;
+
+/**
+ * A minimal shim over the constructor to {@link MesosSchedulerDriver} to minimize code that
+ * requires the libmesos native library.
+ */
+class DriverFactoryImpl implements DriverFactory {
+
+ @Override
+ public SchedulerDriver create(
+ Scheduler scheduler,
+ Optional<Protos.Credential> credentials,
+ FrameworkInfo frameworkInfo,
+ String master) {
+
+ if (credentials.isPresent()) {
+ return new MesosSchedulerDriver(scheduler, frameworkInfo, master, credentials.get());
+ } else {
+ return new MesosSchedulerDriver(scheduler, frameworkInfo, master);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/mesos/DriverSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/DriverSettings.java b/src/main/java/org/apache/aurora/scheduler/mesos/DriverSettings.java
new file mode 100644
index 0000000..85d471f
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/DriverSettings.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+
+import org.apache.mesos.Protos;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Settings required to create a scheduler driver.
+ */
+@VisibleForTesting
+public class DriverSettings {
+ private final String masterUri;
+ private final Optional<Protos.Credential> credentials;
+ private final Protos.FrameworkInfo frameworkInfo;
+
+ public DriverSettings(
+ String masterUri,
+ Optional<Protos.Credential> credentials,
+ Protos.FrameworkInfo frameworkInfo) {
+
+ this.masterUri = requireNonNull(masterUri);
+ this.credentials = requireNonNull(credentials);
+ this.frameworkInfo = requireNonNull(frameworkInfo);
+ }
+
+ public String getMasterUri() {
+ return masterUri;
+ }
+
+ public Optional<Protos.Credential> getCredentials() {
+ return credentials;
+ }
+
+ public Protos.FrameworkInfo getFrameworkInfo() {
+ return frameworkInfo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java
new file mode 100644
index 0000000..e1a2359
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/LibMesosLoadingModule.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import com.google.inject.AbstractModule;
+
+/**
+ * A module that binds a driver factory which requires the libmesos native libary.
+ */
+public class LibMesosLoadingModule extends AbstractModule {
+ @Override
+ protected void configure() {
+ bind(DriverFactory.class).to(DriverFactoryImpl.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/66bd6fec/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
new file mode 100644
index 0000000..2d382f7
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+import javax.inject.Qualifier;
+
+import com.google.common.base.Preconditions;
+import com.twitter.common.application.Lifecycle;
+import com.twitter.common.inject.TimedInterceptor.Timed;
+import com.twitter.common.stats.Stats;
+
+import org.apache.aurora.GuiceUtils.AllowUnchecked;
+import org.apache.aurora.scheduler.TaskLauncher;
+import org.apache.aurora.scheduler.base.Conversions;
+import org.apache.aurora.scheduler.base.SchedulerException;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
+import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.MasterInfo;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.TaskStatus;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Location for communication with mesos.
+ */
+class MesosSchedulerImpl implements Scheduler {
+ private static final Logger LOG = Logger.getLogger(MesosSchedulerImpl.class.getName());
+
+ private final AtomicLong totalResourceOffers = Stats.exportLong("scheduler_resource_offers");
+ private final AtomicLong totalFailedStatusUpdates = Stats.exportLong("scheduler_status_updates");
+ private final AtomicLong totalFrameworkDisconnects =
+ Stats.exportLong("scheduler_framework_disconnects");
+ private final AtomicLong totalFrameworkReregisters =
+ Stats.exportLong("scheduler_framework_reregisters");
+ private final AtomicLong totalLostExecutors = Stats.exportLong("scheduler_lost_executors");
+
+ private final List<TaskLauncher> taskLaunchers;
+
+ private final Storage storage;
+ private final Lifecycle lifecycle;
+ private final EventSink eventSink;
+ private final Executor executor;
+ private volatile boolean isRegistered = false;
+
+ /**
+ * Binding annotation for the executor the incoming Mesos message handler uses.
+ */
+ @Qualifier
+ @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+ @interface SchedulerExecutor { }
+
+ /**
+ * Creates a new scheduler.
+ *
+ * @param storage Store to save host attributes into.
+ * @param lifecycle Application lifecycle manager.
+ * @param taskLaunchers Task launchers, which will be used in order. Calls to
+ * {@link TaskLauncher#willUse(Offer)} and
+ * {@link TaskLauncher#statusUpdate(TaskStatus)} are propagated to provided
+ * launchers, ceasing after the first match (based on a return value of
+ * {@code true}.
+ * @param eventSink Pubsub sink to send driver status changes to.
+ * @param executor Executor for async work
+ */
+ @Inject
+ public MesosSchedulerImpl(
+ Storage storage,
+ final Lifecycle lifecycle,
+ List<TaskLauncher> taskLaunchers,
+ EventSink eventSink,
+ @SchedulerExecutor Executor executor) {
+
+ this.storage = requireNonNull(storage);
+ this.lifecycle = requireNonNull(lifecycle);
+ this.taskLaunchers = requireNonNull(taskLaunchers);
+ this.eventSink = requireNonNull(eventSink);
+ this.executor = requireNonNull(executor);
+ }
+
+ @Override
+ public void slaveLost(SchedulerDriver schedulerDriver, SlaveID slaveId) {
+ LOG.info("Received notification of lost slave: " + slaveId);
+ }
+
+ @Override
+ public void registered(
+ SchedulerDriver driver,
+ final FrameworkID frameworkId,
+ MasterInfo masterInfo) {
+
+ LOG.info("Registered with ID " + frameworkId + ", master: " + masterInfo);
+
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(MutableStoreProvider storeProvider) {
+ storeProvider.getSchedulerStore().saveFrameworkId(frameworkId.getValue());
+ }
+ });
+ isRegistered = true;
+ eventSink.post(new DriverRegistered());
+ }
+
+ @Override
+ public void disconnected(SchedulerDriver schedulerDriver) {
+ LOG.warning("Framework disconnected.");
+ totalFrameworkDisconnects.incrementAndGet();
+ eventSink.post(new DriverDisconnected());
+ }
+
+ @Override
+ public void reregistered(SchedulerDriver schedulerDriver, MasterInfo masterInfo) {
+ LOG.info("Framework re-registered with master " + masterInfo);
+ totalFrameworkReregisters.incrementAndGet();
+ }
+
+ @Timed("scheduler_resource_offers")
+ @Override
+ public void resourceOffers(SchedulerDriver driver, final List<Offer> offers) {
+ Preconditions.checkState(isRegistered, "Must be registered before receiving offers.");
+
+ // Store all host attributes in a single write operation to prevent other threads from
+ // securing the storage lock between saves. We also save the host attributes before passing
+ // offers elsewhere to ensure that host attributes are available before attempting to
+ // schedule tasks associated with offers.
+ // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over
+ // offers when the host attributes cannot be found. (AURORA-137)
+
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(MutableStoreProvider storeProvider) {
+ for (final Offer offer : offers) {
+ storeProvider.getAttributeStore()
+ .saveHostAttributes(Conversions.getAttributes(offer));
+ }
+ }
+ });
+
+ for (Offer offer : offers) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, String.format("Received offer: %s", offer));
+ }
+ totalResourceOffers.incrementAndGet();
+ for (TaskLauncher launcher : taskLaunchers) {
+ if (launcher.willUse(offer)) {
+ break;
+ }
+ }
+ }
+ }
+ });
+ }
+
+ @Override
+ public void offerRescinded(SchedulerDriver schedulerDriver, OfferID offerId) {
+ LOG.info("Offer rescinded: " + offerId);
+ for (TaskLauncher launcher : taskLaunchers) {
+ launcher.cancelOffer(offerId);
+ }
+ }
+
+ @AllowUnchecked
+ @Timed("scheduler_status_update")
+ @Override
+ public void statusUpdate(SchedulerDriver driver, TaskStatus status) {
+ String info = status.hasData() ? status.getData().toStringUtf8() : null;
+ String infoMsg = info == null ? "" : " with info " + info;
+ String coreMsg = status.hasMessage() ? " with core message " + status.getMessage() : "";
+ LOG.info("Received status update for task " + status.getTaskId().getValue()
+ + " in state " + status.getState() + infoMsg + coreMsg);
+
+ try {
+ for (TaskLauncher launcher : taskLaunchers) {
+ if (launcher.statusUpdate(status)) {
+ return;
+ }
+ }
+ } catch (SchedulerException e) {
+ LOG.log(Level.SEVERE, "Status update failed due to scheduler exception: " + e, e);
+ // We re-throw the exception here in an effort to NACK the status update and trigger an
+ // abort of the driver. Previously we directly issued driver.abort(), but the re-entrancy
+ // guarantees of that are uncertain (and we believe it was not working). However, this
+ // was difficult to discern since logging is unreliable during JVM shutdown and we would not
+ // see the above log message.
+ throw e;
+ }
+
+ LOG.warning("Unhandled status update " + status);
+ totalFailedStatusUpdates.incrementAndGet();
+ }
+
+ @Override
+ public void error(SchedulerDriver driver, String message) {
+ LOG.severe("Received error message: " + message);
+ lifecycle.shutdown();
+ }
+
+ @Override
+ public void executorLost(SchedulerDriver schedulerDriver, ExecutorID executorID, SlaveID slaveID,
+ int status) {
+
+ LOG.info("Lost executor " + executorID);
+ totalLostExecutors.incrementAndGet();
+ }
+
+ @Timed("scheduler_framework_message")
+ @Override
+ public void frameworkMessage(
+ SchedulerDriver driver,
+ ExecutorID executorID,
+ SlaveID slave,
+ byte[] data) {
+
+ LOG.warning("Ignoring framework message.");
+ }
+}