You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:31 UTC
[38/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
new file mode 100644
index 0000000..f90869d
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
@@ -0,0 +1,417 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.Atomics;
+
+import org.apache.mesos.Protos;
+import org.apache.mesos.SchedulerDriver;
+
+import com.twitter.aurora.scheduler.events.PubsubEvent.DriverRegistered;
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.Storage.NonVolatileStorage;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.StorageBackfill;
+import com.twitter.common.application.Lifecycle;
+import com.twitter.common.base.Closure;
+import com.twitter.common.base.Closures;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatImpl;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.StateMachine;
+import com.twitter.common.util.StateMachine.Transition;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.common.zookeeper.ServerSet;
+import com.twitter.common.zookeeper.SingletonService.LeaderControl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.common.zookeeper.SingletonService.LeadershipListener;
+
+/**
+ * The central driver of the scheduler runtime lifecycle. Handles the transitions from startup and
+ * initialization through acting as a standby scheduler / log replica and finally to becoming the
+ * scheduler leader.
+ * <p>
+ * The (enforced) call order to be used with this class is:
+ * <ol>
+ * <li>{@link #prepare()}, to initialize the storage system.</li>
+ * <li>{@link LeadershipListener#onLeading(LeaderControl) onLeading()} on the
+ * {@link LeadershipListener LeadershipListener}
+ * returned from {@link #prepare()}, signaling that this process has exclusive control of the
+ * cluster.</li>
+ * <li>{@link #registered(DriverRegistered) registered()},
+ * indicating that registration with the mesos master has succeeded.
+ * At this point, the scheduler's presence will be announced via
+ * {@link LeaderControl#advertise() advertise()}.</li>
+ * </ol>
+ * If this call order is broken, calls will fail by throwing
+ * {@link java.lang.IllegalStateException}.
+ * <p>
+ * At any point in the lifecycle, the scheduler will respond to
+ * {@link LeadershipListener#onDefeated(com.twitter.common.zookeeper.ServerSet.EndpointStatus)
+ * onDefeated()} by initiating a clean shutdown using {@link Lifecycle#shutdown() shutdown()}.
+ * A clean shutdown will also be initiated if control actions fail during normal state transitions.
+ */
+public class SchedulerLifecycle implements EventSubscriber {
+
+ private static final Logger LOG = Logger.getLogger(SchedulerLifecycle.class.getName());
+
+ private enum State {
+ IDLE,
+ PREPARING_STORAGE,
+ STORAGE_PREPARED,
+ LEADER_AWAITING_REGISTRATION,
+ REGISTERED_LEADER,
+ RUNNING,
+ DEAD
+ }
+
+ private static final Predicate<Transition<State>> IS_DEAD = new Predicate<Transition<State>>() {
+ @Override public boolean apply(Transition<State> state) {
+ return state.getTo() == State.DEAD;
+ }
+ };
+
+ private static final Predicate<Transition<State>> NOT_DEAD = Predicates.not(IS_DEAD);
+
+ private final LeadershipListener leadershipListener;
+ private final AtomicBoolean registrationAcked = new AtomicBoolean(false);
+ private final AtomicReference<LeaderControl> leaderControl = Atomics.newReference();
+ private final StateMachine<State> stateMachine;
+
+ @Inject
+ SchedulerLifecycle(
+ final DriverFactory driverFactory,
+ final NonVolatileStorage storage,
+ final Lifecycle lifecycle,
+ final Driver driver,
+ final DriverReference driverRef,
+ final LeadingOptions leadingOptions,
+ final ScheduledExecutorService executorService,
+ final Clock clock) {
+
+ this(
+ driverFactory,
+ storage,
+ lifecycle,
+ driver,
+ driverRef,
+ new DelayedActions() {
+ @Override public void blockingDriverJoin(Runnable runnable) {
+ executorService.execute(runnable);
+ }
+
+ @Override public void onAutoFailover(Runnable runnable) {
+ executorService.schedule(
+ runnable,
+ leadingOptions.leadingTimeLimit.getValue(),
+ leadingOptions.leadingTimeLimit.getUnit().getTimeUnit());
+ }
+
+ @Override public void onRegistrationTimeout(Runnable runnable) {
+ LOG.info(
+ "Giving up on registration in " + leadingOptions.registrationDelayLimit);
+ executorService.schedule(
+ runnable,
+ leadingOptions.registrationDelayLimit.getValue(),
+ leadingOptions.registrationDelayLimit.getUnit().getTimeUnit());
+ }
+ },
+ clock);
+ }
+
+ @VisibleForTesting
+ SchedulerLifecycle(
+ final DriverFactory driverFactory,
+ final NonVolatileStorage storage,
+ final Lifecycle lifecycle,
+ // TODO(wfarner): The presence of Driver and DriverReference is quite confusing. Figure out
+ // a clean way to collapse the duties of DriverReference into DriverImpl.
+ final Driver driver,
+ final DriverReference driverRef,
+ final DelayedActions delayedActions,
+ final Clock clock) {
+
+ Stats.export(new StatImpl<Integer>("framework_registered") {
+ @Override public Integer read() {
+ return registrationAcked.get() ? 1 : 0;
+ }
+ });
+ for (final State state : State.values()) {
+ Stats.export(new StatImpl<Integer>("scheduler_lifecycle_" + state) {
+ @Override public Integer read() {
+ return (state == stateMachine.getState()) ? 1 : 0;
+ }
+ });
+ }
+
+ final Closure<Transition<State>> prepareStorage = new Closure<Transition<State>>() {
+ @Override public void execute(Transition<State> transition) {
+ try {
+ storage.prepare();
+ stateMachine.transition(State.STORAGE_PREPARED);
+ } catch (RuntimeException e) {
+ stateMachine.transition(State.DEAD);
+ throw e;
+ }
+ }
+ };
+
+ final Closure<Transition<State>> handleLeading = new Closure<Transition<State>>() {
+ @Override public void execute(Transition<State> transition) {
+ LOG.info("Elected as leading scheduler!");
+ storage.start(new MutateWork.NoResult.Quiet() {
+ @Override protected void execute(MutableStoreProvider storeProvider) {
+ StorageBackfill.backfill(storeProvider, clock);
+ }
+ });
+
+ @Nullable final String frameworkId = storage.consistentRead(
+ new Work.Quiet<String>() {
+ @Override public String apply(StoreProvider storeProvider) {
+ return storeProvider.getSchedulerStore().fetchFrameworkId();
+ }
+ });
+ driverRef.set(driverFactory.apply(frameworkId));
+
+ delayedActions.onRegistrationTimeout(
+ new Runnable() {
+ @Override public void run() {
+ if (!registrationAcked.get()) {
+ LOG.severe(
+ "Framework has not been registered within the tolerated delay.");
+ stateMachine.transition(State.DEAD);
+ }
+ }
+ });
+
+ delayedActions.onAutoFailover(
+ new Runnable() {
+ @Override public void run() {
+ LOG.info("Triggering automatic failover.");
+ stateMachine.transition(State.DEAD);
+ }
+ });
+
+ Protos.Status status = driver.start();
+ LOG.info("Driver started with code " + status);
+ delayedActions.blockingDriverJoin(new Runnable() {
+ @Override public void run() {
+ // Blocks until driver exits.
+ driver.join();
+ stateMachine.transition(State.DEAD);
+ }
+ });
+ }
+ };
+
+ final Closure<Transition<State>> handleRegistered = new Closure<Transition<State>>() {
+ @Override public void execute(Transition<State> transition) {
+ registrationAcked.set(true);
+ try {
+ leaderControl.get().advertise();
+ } catch (JoinException e) {
+ LOG.log(Level.SEVERE, "Failed to advertise leader, shutting down.", e);
+ stateMachine.transition(State.DEAD);
+ } catch (InterruptedException e) {
+ LOG.log(Level.SEVERE, "Interrupted while advertising leader, shutting down.", e);
+ stateMachine.transition(State.DEAD);
+ Thread.currentThread().interrupt();
+ }
+ }
+ };
+
+ final Closure<Transition<State>> shutDown = new Closure<Transition<State>>() {
+ private final AtomicBoolean invoked = new AtomicBoolean(false);
+ @Override public void execute(Transition<State> transition) {
+ if (!invoked.compareAndSet(false, true)) {
+ LOG.info("Shutdown already invoked, ignoring extra call.");
+ return;
+ }
+
+ // TODO(wfarner): Consider using something like guava's Closer to abstractly tear down
+ // resources here.
+ try {
+ LeaderControl control = leaderControl.get();
+ if (control != null) {
+ try {
+ control.leave();
+ } catch (JoinException e) {
+ LOG.log(Level.WARNING, "Failed to leave leadership: " + e, e);
+ } catch (ServerSet.UpdateException e) {
+ LOG.log(Level.WARNING, "Failed to leave server set: " + e, e);
+ }
+ }
+
+ // TODO(wfarner): Re-evaluate tear-down ordering here. Should the top-level shutdown
+ // be invoked first, or the underlying critical components?
+ driver.stop();
+ storage.stop();
+ } finally {
+ lifecycle.shutdown();
+ }
+ }
+ };
+
+ stateMachine = StateMachine.<State>builder("SchedulerLifecycle")
+ .initialState(State.IDLE)
+ .logTransitions()
+ .addState(
+ Closures.filter(NOT_DEAD, prepareStorage),
+ State.IDLE,
+ State.PREPARING_STORAGE, State.DEAD)
+ .addState(
+ State.PREPARING_STORAGE,
+ State.STORAGE_PREPARED, State.DEAD)
+ .addState(
+ Closures.filter(NOT_DEAD, handleLeading),
+ State.STORAGE_PREPARED,
+ State.LEADER_AWAITING_REGISTRATION, State.DEAD)
+ .addState(
+ Closures.filter(NOT_DEAD, handleRegistered),
+ State.LEADER_AWAITING_REGISTRATION,
+ State.REGISTERED_LEADER, State.DEAD)
+ .addState(
+ State.REGISTERED_LEADER,
+ State.RUNNING, State.DEAD)
+ .addState(
+ State.RUNNING,
+ State.DEAD)
+ .addState(
+ State.DEAD,
+ // Allow cycles in DEAD to prevent throwing and avoid the need for call-site checking.
+ State.DEAD
+ )
+ .onAnyTransition(
+ Closures.filter(IS_DEAD, shutDown))
+ .build();
+
+ this.leadershipListener = new SchedulerCandidateImpl(stateMachine, leaderControl);
+ }
+
+ /**
+ * Prepares a scheduler to offer itself as a leader candidate. After this call the scheduler will
+ * host a live log replica and start syncing data from the leader via the log until it gets called
+ * upon to lead.
+ *
+ * @return A listener that can be offered for leadership of a distributed election.
+ */
+ public LeadershipListener prepare() {
+ stateMachine.transition(State.PREPARING_STORAGE);
+ return leadershipListener;
+ }
+
+ @Subscribe
+ public void registered(DriverRegistered event) {
+ stateMachine.transition(State.REGISTERED_LEADER);
+ }
+
+ /**
+ * Maintains a reference to the driver.
+ */
+ static class DriverReference implements Supplier<Optional<SchedulerDriver>> {
+ private final AtomicReference<SchedulerDriver> driver = Atomics.newReference();
+
+ @Override public Optional<SchedulerDriver> get() {
+ return Optional.fromNullable(driver.get());
+ }
+
+ private void set(SchedulerDriver ref) {
+ driver.set(ref);
+ }
+ }
+
+ private static class SchedulerCandidateImpl implements LeadershipListener {
+ private final StateMachine<State> stateMachine;
+ private final AtomicReference<LeaderControl> leaderControl;
+
+ SchedulerCandidateImpl(
+ StateMachine<State> stateMachine,
+ AtomicReference<LeaderControl> leaderControl) {
+
+ this.stateMachine = stateMachine;
+ this.leaderControl = leaderControl;
+ }
+
+ @Override public void onLeading(LeaderControl control) {
+ leaderControl.set(control);
+ stateMachine.transition(State.LEADER_AWAITING_REGISTRATION);
+ }
+
+ @Override public void onDefeated(@Nullable ServerSet.EndpointStatus status) {
+ LOG.severe("Lost leadership, committing suicide.");
+ stateMachine.transition(State.DEAD);
+ }
+ }
+
+ public static class LeadingOptions {
+ private final Amount<Long, Time> registrationDelayLimit;
+ private final Amount<Long, Time> leadingTimeLimit;
+
+ /**
+ * Creates a new collection of options for tuning leadership behavior.
+ *
+ * @param registrationDelayLimit Maximum amount of time to wait for framework registration to
+ * complete.
+ * @param leadingTimeLimit Maximum amount of time to serve as leader before abdicating.
+ */
+ public LeadingOptions(
+ Amount<Long, Time> registrationDelayLimit,
+ Amount<Long, Time> leadingTimeLimit) {
+
+ Preconditions.checkArgument(
+ registrationDelayLimit.getValue() >= 0,
+ "Registration delay limit must be positive.");
+ Preconditions.checkArgument(
+ leadingTimeLimit.getValue() >= 0,
+ "Leading time limit must be positive.");
+
+ this.registrationDelayLimit = checkNotNull(registrationDelayLimit);
+ this.leadingTimeLimit = checkNotNull(leadingTimeLimit);
+ }
+ }
+
+ @VisibleForTesting
+ interface DelayedActions {
+ void blockingDriverJoin(Runnable runnable);
+
+ void onAutoFailover(Runnable runnable);
+
+ void onRegistrationTimeout(Runnable runnable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
new file mode 100644
index 0000000..be4c2b1
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import javax.inject.Singleton;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.AbstractModule;
+import com.google.inject.PrivateModule;
+import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
+
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import com.twitter.aurora.scheduler.Driver.DriverImpl;
+import com.twitter.aurora.scheduler.SchedulerLifecycle.DriverReference;
+import com.twitter.aurora.scheduler.SchedulerLifecycle.LeadingOptions;
+import com.twitter.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
+import com.twitter.aurora.scheduler.events.PubsubEventModule;
+import com.twitter.aurora.scheduler.periodic.GcExecutorLauncher;
+import com.twitter.aurora.scheduler.periodic.GcExecutorLauncher.GcExecutorSettings;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+/**
+ * Binding module for top-level scheduling logic.
+ */
+public class SchedulerModule extends AbstractModule {
+
+ @CmdLine(name = "executor_gc_interval",
+ help = "Max interval on which to run the GC executor on a host to clean up dead tasks.")
+ private static final Arg<Amount<Long, Time>> EXECUTOR_GC_INTERVAL =
+ Arg.create(Amount.of(1L, Time.HOURS));
+
+ @CmdLine(name = "gc_executor_path", help = "Path to the gc executor launch script.")
+ private static final Arg<String> GC_EXECUTOR_PATH = Arg.create(null);
+
+ @CmdLine(name = "max_registration_delay",
+ help = "Max allowable delay to allow the driver to register before aborting")
+ private static final Arg<Amount<Long, Time>> MAX_REGISTRATION_DELAY =
+ Arg.create(Amount.of(1L, Time.MINUTES));
+
+ @CmdLine(name = "max_leading_duration",
+ help = "After leading for this duration, the scheduler should commit suicide.")
+ private static final Arg<Amount<Long, Time>> MAX_LEADING_DURATION =
+ Arg.create(Amount.of(1L, Time.DAYS));
+
+ @Override
+ protected void configure() {
+ bind(Driver.class).to(DriverImpl.class);
+ bind(DriverImpl.class).in(Singleton.class);
+ bind(new TypeLiteral<Supplier<Optional<SchedulerDriver>>>() { }).to(DriverReference.class);
+ bind(DriverReference.class).in(Singleton.class);
+
+ bind(Scheduler.class).to(MesosSchedulerImpl.class);
+ bind(MesosSchedulerImpl.class).in(Singleton.class);
+
+ bind(TaskIdGenerator.class).to(TaskIdGeneratorImpl.class);
+
+ bind(GcExecutorSettings.class).toInstance(new GcExecutorSettings(
+ EXECUTOR_GC_INTERVAL.get(),
+ Optional.fromNullable(GC_EXECUTOR_PATH.get())));
+
+ bind(GcExecutorLauncher.class).in(Singleton.class);
+ bind(UserTaskLauncher.class).in(Singleton.class);
+
+ install(new PrivateModule() {
+ @Override protected void configure() {
+ bind(LeadingOptions.class).toInstance(
+ new LeadingOptions(MAX_REGISTRATION_DELAY.get(), MAX_LEADING_DURATION.get()));
+ final ScheduledExecutorService executor = Executors.newScheduledThreadPool(
+ 1,
+ new ThreadFactoryBuilder().setNameFormat("Lifecycle-%d").setDaemon(true).build());
+ bind(ScheduledExecutorService.class).toInstance(executor);
+ bind(SchedulerLifecycle.class).in(Singleton.class);
+ expose(SchedulerLifecycle.class);
+ }
+ });
+
+ PubsubEventModule.bindSubscriber(binder(), SchedulerLifecycle.class);
+ PubsubEventModule.bindSubscriber(binder(), TaskVars.class);
+ }
+
+ @Provides
+ @Singleton
+ List<TaskLauncher> provideTaskLaunchers(
+ GcExecutorLauncher gcLauncher,
+ UserTaskLauncher userTaskLauncher) {
+
+ return ImmutableList.of(gcLauncher, userTaskLauncher);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/TaskIdGenerator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskIdGenerator.java b/src/main/java/org/apache/aurora/scheduler/TaskIdGenerator.java
new file mode 100644
index 0000000..240649e
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/TaskIdGenerator.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.util.UUID;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Preconditions;
+
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.common.util.Clock;
+
+/**
+ * A function that generates universally-unique (not guaranteed, but highly confident) task IDs.
+ */
+public interface TaskIdGenerator {
+
+ /**
+ * Generates a universally-unique ID for the task. This is not necessarily a repeatable
+ * operation, two subsequent invocations with the same object need not return the same value.
+ *
+ * @param task Configuration of the task to create an ID for.
+ * @param instanceId Instance ID for the task.
+ * @return A universally-unique ID for the task.
+ */
+ String generate(ITaskConfig task, int instanceId);
+
+ class TaskIdGeneratorImpl implements TaskIdGenerator {
+ private final Clock clock;
+
+ @Inject
+ TaskIdGeneratorImpl(Clock clock) {
+ this.clock = Preconditions.checkNotNull(clock);
+ }
+
+ @Override
+ public String generate(ITaskConfig task, int instanceId) {
+ String sep = "-";
+ return new StringBuilder()
+ .append(clock.nowMillis()) // Allows chronological sorting.
+ .append(sep)
+ .append(task.getOwner().getRole()) // Identification and collision prevention.
+ .append(sep)
+ .append(task.getEnvironment())
+ .append(sep)
+ .append(task.getJobName())
+ .append(sep)
+ .append(instanceId) // Collision prevention within job.
+ .append(sep)
+ .append(UUID.randomUUID()) // Just-in-case collision prevention.
+ .toString().replaceAll("[^\\w-]", sep); // Constrain character set.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
new file mode 100644
index 0000000..aade6da
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import com.google.common.base.Optional;
+
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskStatus;
+
+/**
+ * A receiver of resource offers and task status updates.
+ */
+public interface TaskLauncher {
+
+ /**
+ * Grants a resource offer to the task launcher, which will be passed to any subsequent task
+ * launchers if this one does not accept.
+ * <p>
+ * A task launcher may choose to retain an offer for later use. Any retained offers must be
+ * cleaned up with {@link #cancelOffer(OfferID)}.
+ *
+ * @param offer The resource offer.
+ * @return A task, absent if the launcher chooses not to accept the offer.
+ */
+ Optional<TaskInfo> createTask(Offer offer);
+
+ /**
+ * Informs the launcher that a status update has been received for a task. If the task is not
+ * associated with the launcher, it should return {@code false} so that another launcher may
+ * receive it.
+ *
+ * @param status The status update.
+ * @return {@code true} if the status is relevant to the launcher and should not be delivered to
+ * other launchers, {@code false} otherwise.
+ */
+ boolean statusUpdate(TaskStatus status);
+
+ /**
+ * Informs the launcher that a previously-advertised offer is canceled and may not be used.
+ *
+ * @param offer The canceled offer.
+ */
+ void cancelOffer(OfferID offer);
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/TaskVars.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskVars.java b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
new file mode 100644
index 0000000..5574631
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.google.common.eventbus.Subscribe;
+
+import com.twitter.aurora.gen.Attribute;
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import com.twitter.aurora.scheduler.events.PubsubEvent.StorageStarted;
+import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import com.twitter.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import com.twitter.aurora.scheduler.storage.AttributeStore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.stats.StatsProvider;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A container that tracks and exports stat counters for tasks.
+ */
+class TaskVars implements EventSubscriber {
+ private static final Logger LOG = Logger.getLogger(TaskVars.class.getName());
+
+ // Used to ignore pubsub events sent before storage has completely started. This avoids a
+ // miscount where a StorageStarted consumer is invoked before storageStarted is invoked here,
+ // and pubsub events are fired for tasks that we have not yet counted. For example, if
+ // tasksDeleted is invoked, we would end up with a negative count.
+ private volatile boolean storageStarted = false;
+
+ private final LoadingCache<String, AtomicLong> countersByStatus;
+ private final LoadingCache<String, AtomicLong> countersByRack;
+
+ private final Storage storage;
+
+ @Inject
+ TaskVars(Storage storage, final StatsProvider statProvider) {
+ this.storage = checkNotNull(storage);
+ checkNotNull(statProvider);
+ countersByStatus = CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() {
+ @Override public AtomicLong load(String statName) {
+ return statProvider.makeCounter(statName);
+ }
+ });
+ countersByRack = CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() {
+ @Override public AtomicLong load(String rack) {
+ return statProvider.makeCounter(rackStatName(rack));
+ }
+ });
+ }
+
+ @VisibleForTesting
+ static String getVarName(ScheduleStatus status) {
+ return "task_store_" + status;
+ }
+
+ @VisibleForTesting
+ static String rackStatName(String rack) {
+ return "tasks_lost_rack_" + rack;
+ }
+
+ private static final Predicate<Attribute> IS_RACK = new Predicate<Attribute>() {
+ @Override public boolean apply(Attribute attr) {
+ return "rack".equals(attr.getName());
+ }
+ };
+
+ private static final Function<Attribute, String> ATTR_VALUE = new Function<Attribute, String>() {
+ @Override public String apply(Attribute attr) {
+ return Iterables.getOnlyElement(attr.getValues());
+ }
+ };
+
+ private AtomicLong getCounter(ScheduleStatus status) {
+ return countersByStatus.getUnchecked(getVarName(status));
+ }
+
+ private void incrementCount(ScheduleStatus status) {
+ getCounter(status).incrementAndGet();
+ }
+
+ private void decrementCount(ScheduleStatus status) {
+ getCounter(status).decrementAndGet();
+ }
+
+ @Subscribe
+ public void taskChangedState(TaskStateChange stateChange) {
+ if (!storageStarted) {
+ return;
+ }
+
+ IScheduledTask task = stateChange.getTask();
+ if (stateChange.getOldState() != ScheduleStatus.INIT) {
+ decrementCount(stateChange.getOldState());
+ }
+ incrementCount(task.getStatus());
+
+ if (stateChange.getNewState() == ScheduleStatus.LOST) {
+ final String host = stateChange.getTask().getAssignedTask().getSlaveHost();
+ Optional<String> rack = storage.consistentRead(new Work.Quiet<Optional<String>>() {
+ @Override public Optional<String> apply(StoreProvider storeProvider) {
+ Optional<Attribute> rack = FluentIterable
+ .from(AttributeStore.Util.attributesOrNone(storeProvider, host))
+ .firstMatch(IS_RACK);
+ return rack.transform(ATTR_VALUE);
+ }
+ });
+
+ if (rack.isPresent()) {
+ countersByRack.getUnchecked(rack.get()).incrementAndGet();
+ } else {
+ LOG.warning("Failed to find rack attribute associated with host " + host);
+ }
+ }
+ }
+
+ @Subscribe
+ public void storageStarted(StorageStarted event) {
+ for (IScheduledTask task : Storage.Util.consistentFetchTasks(storage, Query.unscoped())) {
+ incrementCount(task.getStatus());
+ }
+
+ // Dummy read the counter for each status counter. This is important to guarantee a stat with
+ // value zero is present for each state, even if all states are not represented in the task
+ // store.
+ for (ScheduleStatus status : ScheduleStatus.values()) {
+ getCounter(status);
+ }
+ storageStarted = true;
+ }
+
+ @Subscribe
+ public void tasksDeleted(final TasksDeleted event) {
+ if (!storageStarted) {
+ return;
+ }
+
+ for (IScheduledTask task : event.getTasks()) {
+ decrementCount(task.getStatus());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
new file mode 100644
index 0000000..0fb3bbb
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskStatus;
+
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.scheduler.async.OfferQueue;
+import com.twitter.aurora.scheduler.base.Conversions;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.SchedulerException;
+import com.twitter.aurora.scheduler.state.StateManager;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A task launcher that matches resource offers against user tasks.
+ */
+class UserTaskLauncher implements TaskLauncher {
+
+ private static final Logger LOG = Logger.getLogger(UserTaskLauncher.class.getName());
+
+ @VisibleForTesting
+ static final String MEMORY_LIMIT_EXCEEDED = "MEMORY STATISTICS";
+
+ @VisibleForTesting
+ static final String MEMORY_LIMIT_DISPLAY = "Task used more memory than requested.";
+
+ private final OfferQueue offerQueue;
+ private final StateManager stateManager;
+
+ @Inject
+ UserTaskLauncher(OfferQueue offerQueue, StateManager stateManager) {
+ this.offerQueue = checkNotNull(offerQueue);
+ this.stateManager = checkNotNull(stateManager);
+ }
+
+ @Override
+ public Optional<TaskInfo> createTask(Offer offer) {
+ checkNotNull(offer);
+
+ offerQueue.addOffer(offer);
+ return Optional.absent();
+ }
+
+ @Override
+ public synchronized boolean statusUpdate(TaskStatus status) {
+ @Nullable String message = null;
+ if (status.hasMessage()) {
+ message = status.getMessage();
+ }
+
+ try {
+ ScheduleStatus translatedState = Conversions.convertProtoState(status.getState());
+ // TODO(William Farner): Remove this hack once Mesos API change is done.
+ // Tracked by: https://issues.apache.org/jira/browse/MESOS-343
+ if ((translatedState == ScheduleStatus.FAILED)
+ && (message != null)
+ && (message.contains(MEMORY_LIMIT_EXCEEDED))) {
+ message = MEMORY_LIMIT_DISPLAY;
+ }
+
+ stateManager.changeState(
+ Query.taskScoped(status.getTaskId().getValue()),
+ translatedState,
+ Optional.fromNullable(message));
+ } catch (SchedulerException e) {
+ LOG.log(Level.WARNING, "Failed to update status for: " + status, e);
+ throw e;
+ }
+ return true;
+ }
+
+ @Override
+ public void cancelOffer(OfferID offer) {
+ offerQueue.cancelOffer(offer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
new file mode 100644
index 0000000..24702b0
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.app;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.google.inject.AbstractModule;
+import com.google.inject.Key;
+import com.google.inject.Provides;
+
+import org.apache.mesos.Scheduler;
+import org.apache.zookeeper.data.ACL;
+
+import com.twitter.aurora.GuiceUtils;
+import com.twitter.aurora.scheduler.SchedulerModule;
+import com.twitter.aurora.scheduler.async.AsyncModule;
+import com.twitter.aurora.scheduler.events.PubsubEventModule;
+import com.twitter.aurora.scheduler.filter.SchedulingFilterImpl;
+import com.twitter.aurora.scheduler.http.ClusterName;
+import com.twitter.aurora.scheduler.http.ServletModule;
+import com.twitter.aurora.scheduler.metadata.MetadataModule;
+import com.twitter.aurora.scheduler.quota.QuotaModule;
+import com.twitter.aurora.scheduler.state.StateModule;
+import com.twitter.aurora.scheduler.stats.AsyncStatsModule;
+import com.twitter.common.application.ShutdownRegistry;
+import com.twitter.common.application.modules.LifecycleModule;
+import com.twitter.common.base.Command;
+import com.twitter.common.inject.TimedInterceptor;
+import com.twitter.common.net.pool.DynamicHostSet;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.util.Clock;
+import com.twitter.common.zookeeper.ServerSet;
+import com.twitter.common.zookeeper.ServerSetImpl;
+import com.twitter.common.zookeeper.SingletonService;
+import com.twitter.common.zookeeper.ZooKeeperClient;
+import com.twitter.common.zookeeper.ZooKeeperClient.Credentials;
+import com.twitter.common.zookeeper.ZooKeeperUtils;
+import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule.ClientConfig;
+import com.twitter.thrift.ServiceInstance;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.common.base.MorePreconditions.checkNotBlank;
+
+/**
+ * Binding module for the aurora scheduler application.
+ */
+class AppModule extends AbstractModule {
+ private static final Logger LOG = Logger.getLogger(AppModule.class.getName());
+
+ private final String clusterName;
+ private final String serverSetPath;
+ private final ClientConfig zkClientConfig;
+
+ AppModule(String clusterName, String serverSetPath, ClientConfig zkClientConfig) {
+ this.clusterName = checkNotBlank(clusterName);
+ this.serverSetPath = checkNotBlank(serverSetPath);
+ this.zkClientConfig = checkNotNull(zkClientConfig);
+ }
+
+ @Override
+ protected void configure() {
+ // Enable intercepted method timings and context classloader repair.
+ TimedInterceptor.bind(binder());
+ GuiceUtils.bindJNIContextClassLoader(binder(), Scheduler.class);
+ GuiceUtils.bindExceptionTrap(binder(), Scheduler.class);
+
+ bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
+
+ bind(Key.get(String.class, ClusterName.class)).toInstance(clusterName);
+
+ // Filter layering: notifier filter -> base impl
+ PubsubEventModule.bind(binder(), SchedulingFilterImpl.class);
+ bind(SchedulingFilterImpl.class).in(Singleton.class);
+
+ LifecycleModule.bindStartupAction(binder(), RegisterShutdownStackPrinter.class);
+
+ install(new AsyncModule());
+ install(new AsyncStatsModule());
+ install(new MetadataModule());
+ install(new QuotaModule());
+ install(new ServletModule());
+ install(new SchedulerModule());
+ install(new StateModule());
+
+ bind(StatsProvider.class).toInstance(Stats.STATS_PROVIDER);
+ }
+
+ /**
+ * Command to register a thread stack printer that identifies initiator of a shutdown.
+ */
+ private static class RegisterShutdownStackPrinter implements Command {
+ private static final Function<StackTraceElement, String> STACK_ELEM_TOSTRING =
+ new Function<StackTraceElement, String>() {
+ @Override public String apply(StackTraceElement element) {
+ return element.getClassName() + "." + element.getMethodName()
+ + String.format("(%s:%s)", element.getFileName(), element.getLineNumber());
+ }
+ };
+
+ private final ShutdownRegistry shutdownRegistry;
+
+ @Inject
+ RegisterShutdownStackPrinter(ShutdownRegistry shutdownRegistry) {
+ this.shutdownRegistry = shutdownRegistry;
+ }
+
+ @Override
+ public void execute() {
+ shutdownRegistry.addAction(new Command() {
+ @Override public void execute() {
+ Thread thread = Thread.currentThread();
+ String message = new StringBuilder()
+ .append("Thread: ").append(thread.getName())
+ .append(" (id ").append(thread.getId()).append(")")
+ .append("\n")
+ .append(Joiner.on("\n ").join(
+ Iterables.transform(Arrays.asList(thread.getStackTrace()), STACK_ELEM_TOSTRING)))
+ .toString();
+
+ LOG.info("Shutdown initiated by: " + message);
+ }
+ });
+ }
+ }
+
+ @Provides
+ @Singleton
+ List<ACL> provideAcls() {
+ if (zkClientConfig.credentials == Credentials.NONE) {
+ LOG.warning("Running without ZooKeeper digest credentials. ZooKeeper ACLs are disabled.");
+ return ZooKeeperUtils.OPEN_ACL_UNSAFE;
+ } else {
+ return ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL;
+ }
+ }
+
+ @Provides
+ @Singleton
+ ServerSet provideServerSet(ZooKeeperClient client, List<ACL> zooKeeperAcls) {
+ return new ServerSetImpl(client, zooKeeperAcls, serverSetPath);
+ }
+
+ @Provides
+ @Singleton
+ DynamicHostSet<ServiceInstance> provideSchedulerHostSet(ServerSet serverSet) {
+ // Used for a type re-binding of the serverset.
+ return serverSet;
+ }
+
+ @Provides
+ @Singleton
+ SingletonService provideSingletonService(
+ ZooKeeperClient client,
+ ServerSet serverSet,
+ List<ACL> zookeeperAcls) {
+
+ return new SingletonService(
+ serverSet,
+ SingletonService.createSingletonCandidate(client, serverSetPath, zookeeperAcls));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/app/Log4jConfigurator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/Log4jConfigurator.java b/src/main/java/org/apache/aurora/scheduler/app/Log4jConfigurator.java
new file mode 100644
index 0000000..0ea2204
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/app/Log4jConfigurator.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.app;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.twitter.common.logging.RootLogConfig;
+import com.twitter.common.logging.RootLogConfig.Configuration;
+import com.twitter.common.logging.log4j.GlogLayout;
+
+/**
+ * Configures log4j logging.
+ */
+final class Log4jConfigurator {
+ private static final java.util.logging.Logger LOG =
+ java.util.logging.Logger.getLogger(Log4jConfigurator.class.getName());
+
+ /**
+ * Configures log4j to log to stderr with a glog format.
+ *
+ * @param glogConfig The glog configuration in effect.
+ */
+ static void configureConsole(Configuration glogConfig) {
+ Preconditions.checkNotNull(glogConfig);
+
+ BasicConfigurator.configure(
+ new ConsoleAppender(new GlogLayout(), ConsoleAppender.SYSTEM_ERR));
+ Logger.getRootLogger().setLevel(getLevel(glogConfig));
+ }
+
+ private static Level getLevel(RootLogConfig.Configuration logConfig) {
+ switch (logConfig.getVlog()) {
+ case FINEST: // fall through
+ case FINER: // fall through
+ case FINE: // fall through
+ case CONFIG:
+ return Level.TRACE;
+ case INFO:
+ return Level.INFO;
+ case WARNING:
+ return Level.WARN;
+ case SEVERE:
+ return Level.ERROR;
+ default:
+ LOG.warning("Mapping unexpected vlog value of " + logConfig.getVlog() + " to log4j TRACE");
+ return Level.TRACE;
+ }
+ }
+
+ private Log4jConfigurator() {
+ // Utility class.
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/app/Modules.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/Modules.java b/src/main/java/org/apache/aurora/scheduler/app/Modules.java
new file mode 100644
index 0000000..72a80e1
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/app/Modules.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.app;
+
+import com.google.inject.Module;
+import com.google.inject.PrivateModule;
+
+/**
+ * A utility class for managing guice modules.
+ */
+final class Modules {
+
+ private Modules() {
+ // Utility class
+ }
+
+ private static Module instantiateModule(final Class<? extends Module> moduleClass) {
+ try {
+ return moduleClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Failed to instantiate module %s. Are you sure it has a no-arg constructor?",
+ moduleClass.getName()),
+ e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Failed to instantiate module %s. Are you sure it's public?",
+ moduleClass.getName()),
+ e);
+ }
+ }
+
+ // Defensively wrap each module provided on the command-line in a PrivateModule that only
+ // exposes requested classes to ensure that we don't depend on surprise extra bindings across
+ // different implementations.
+ static Module wrapInPrivateModule(
+ Class<? extends Module> moduleClass,
+ final Iterable<Class<?>> exposedClasses) {
+
+ final Module module = instantiateModule(moduleClass);
+ return new PrivateModule() {
+ @Override protected void configure() {
+ install(module);
+ for (Class<?> klass : exposedClasses) {
+ expose(klass);
+ }
+ }
+ };
+ }
+
+ static Module getModule(Class<? extends Module> moduleClass) {
+ return instantiateModule(moduleClass);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
new file mode 100644
index 0000000..693c364
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.app;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.logging.Logger;
+
+import javax.annotation.Nonnegative;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+import com.google.inject.AbstractModule;
+import com.google.inject.Module;
+
+import com.twitter.aurora.auth.CapabilityValidator;
+import com.twitter.aurora.auth.SessionValidator;
+import com.twitter.aurora.auth.UnsecureAuthModule;
+import com.twitter.aurora.scheduler.DriverFactory;
+import com.twitter.aurora.scheduler.DriverFactory.DriverFactoryImpl;
+import com.twitter.aurora.scheduler.MesosTaskFactory.ExecutorConfig;
+import com.twitter.aurora.scheduler.SchedulerLifecycle;
+import com.twitter.aurora.scheduler.cron.CronPredictor;
+import com.twitter.aurora.scheduler.cron.CronScheduler;
+import com.twitter.aurora.scheduler.cron.noop.NoopCronModule;
+import com.twitter.aurora.scheduler.local.IsolatedSchedulerModule;
+import com.twitter.aurora.scheduler.log.mesos.MesosLogStreamModule;
+import com.twitter.aurora.scheduler.storage.backup.BackupModule;
+import com.twitter.aurora.scheduler.storage.log.LogStorage;
+import com.twitter.aurora.scheduler.storage.log.LogStorageModule;
+import com.twitter.aurora.scheduler.storage.log.SnapshotStoreImpl;
+import com.twitter.aurora.scheduler.storage.mem.MemStorageModule;
+import com.twitter.aurora.scheduler.thrift.ThriftConfiguration;
+import com.twitter.aurora.scheduler.thrift.ThriftModule;
+import com.twitter.aurora.scheduler.thrift.auth.ThriftAuthModule;
+import com.twitter.common.application.AbstractApplication;
+import com.twitter.common.application.AppLauncher;
+import com.twitter.common.application.Lifecycle;
+import com.twitter.common.application.modules.HttpModule;
+import com.twitter.common.application.modules.LocalServiceRegistry;
+import com.twitter.common.application.modules.LogModule;
+import com.twitter.common.application.modules.StatsModule;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.args.constraints.NotEmpty;
+import com.twitter.common.args.constraints.NotNull;
+import com.twitter.common.inject.Bindings;
+import com.twitter.common.logging.RootLogConfig;
+import com.twitter.common.zookeeper.Group;
+import com.twitter.common.zookeeper.SingletonService;
+import com.twitter.common.zookeeper.SingletonService.LeadershipListener;
+import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule;
+import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule.ClientConfig;
+import com.twitter.common.zookeeper.guice.client.flagged.FlaggedClientConfig;
+
+/**
+ * Launcher for the aurora scheduler.
+ */
+public class SchedulerMain extends AbstractApplication {
+
+ private static final Logger LOG = Logger.getLogger(SchedulerMain.class.getName());
+
+ @CmdLine(name = "testing_isolated_scheduler",
+ help = "If true, run in a testing mode with the scheduler isolated from other components.")
+ private static final Arg<Boolean> ISOLATED_SCHEDULER = Arg.create(false);
+
+ @NotNull
+ @CmdLine(name = "cluster_name", help = "Name to identify the cluster being served.")
+ private static final Arg<String> CLUSTER_NAME = Arg.create();
+
+ @NotNull
+ @NotEmpty
+ @CmdLine(name = "serverset_path", help = "ZooKeeper ServerSet path to register at.")
+ private static final Arg<String> SERVERSET_PATH = Arg.create();
+
+ @CmdLine(name = "mesos_ssl_keyfile",
+ help = "JKS keyfile for operating the Mesos Thrift-over-SSL interface.")
+ private static final Arg<File> MESOS_SSL_KEY_FILE = Arg.create();
+
+ @Nonnegative
+ @CmdLine(name = "thrift_port", help = "Thrift server port.")
+ private static final Arg<Integer> THRIFT_PORT = Arg.create(0);
+
+ @NotNull
+ @CmdLine(name = "thermos_executor_path", help = "Path to the thermos executor launch script.")
+ private static final Arg<String> THERMOS_EXECUTOR_PATH = Arg.create();
+
+ @CmdLine(name = "auth_module",
+ help = "A Guice module to provide auth bindings. NOTE: The default is unsecure.")
+ private static final Arg<? extends Class<? extends Module>> AUTH_MODULE =
+ Arg.create(UnsecureAuthModule.class);
+
+ private static final Iterable<Class<?>> AUTH_MODULE_CLASSES = ImmutableList.<Class<?>>builder()
+ .add(SessionValidator.class)
+ .add(CapabilityValidator.class)
+ .build();
+
+ @CmdLine(name = "cron_module",
+ help = "A Guice module to provide cron bindings. NOTE: The default is a no-op.")
+ private static final Arg<? extends Class<? extends Module>> CRON_MODULE =
+ Arg.create(NoopCronModule.class);
+
+ private static final Iterable<Class<?>> CRON_MODULE_CLASSES = ImmutableList.<Class<?>>builder()
+ .add(CronPredictor.class)
+ .add(CronScheduler.class)
+ .build();
+
+ // TODO(Suman Karumuri): Pass in AUTH and CRON modules as extra modules
+ @CmdLine(name = "extra_modules",
+ help = "A list of modules that provide additional functionality.")
+ private static final Arg<List<Class<? extends Module>>> EXTRA_MODULES =
+ Arg.create((List<Class<? extends Module>>) ImmutableList.<Class<? extends Module>>of());
+
+ @Inject private SingletonService schedulerService;
+ @Inject private LocalServiceRegistry serviceRegistry;
+ @Inject private SchedulerLifecycle schedulerLifecycle;
+ @Inject private Lifecycle appLifecycle;
+ @Inject private Optional<RootLogConfig.Configuration> glogConfig;
+
+ private static Iterable<? extends Module> getSystemModules() {
+ return ImmutableList.of(
+ new LogModule(),
+ new HttpModule(),
+ new StatsModule()
+ );
+ }
+
+ private static Iterable<? extends Module> getExtraModules() {
+ Builder<Module> modules = ImmutableList.builder();
+ modules.add(Modules.wrapInPrivateModule(AUTH_MODULE.get(), AUTH_MODULE_CLASSES))
+ .add(Modules.wrapInPrivateModule(CRON_MODULE.get(), CRON_MODULE_CLASSES));
+
+ for (Class<? extends Module> moduleClass : EXTRA_MODULES.get()) {
+ modules.add(Modules.getModule(moduleClass));
+ }
+
+ return modules.build();
+ }
+
+ static Iterable<? extends Module> getModules(
+ String clusterName,
+ String serverSetPath,
+ ClientConfig zkClientConfig) {
+
+ return ImmutableList.<Module>builder()
+ .addAll(getSystemModules())
+ .add(new AppModule(clusterName, serverSetPath, zkClientConfig))
+ .addAll(getExtraModules())
+ .add(new LogStorageModule())
+ .add(new MemStorageModule(Bindings.annotatedKeyFactory(LogStorage.WriteBehind.class)))
+ .add(new ThriftModule())
+ .add(new ThriftAuthModule())
+ .build();
+ }
+
+ @Override
+ public Iterable<? extends Module> getModules() {
+ Module additional;
+ final ClientConfig zkClientConfig = FlaggedClientConfig.create();
+ if (ISOLATED_SCHEDULER.get()) {
+ additional = new IsolatedSchedulerModule();
+ } else {
+ // TODO(Kevin Sweeney): Push these bindings down into a "production" module.
+ additional = new AbstractModule() {
+ @Override protected void configure() {
+ bind(DriverFactory.class).to(DriverFactoryImpl.class);
+ bind(DriverFactoryImpl.class).in(Singleton.class);
+ install(new MesosLogStreamModule(zkClientConfig));
+ }
+ };
+ }
+
+ Module configModule = new AbstractModule() {
+ @Override protected void configure() {
+ bind(ThriftConfiguration.class).toInstance(new ThriftConfiguration() {
+ @Override public Optional<InputStream> getSslKeyStream() throws FileNotFoundException {
+ if (MESOS_SSL_KEY_FILE.hasAppliedValue()) {
+ return Optional.<InputStream>of(new FileInputStream(MESOS_SSL_KEY_FILE.get()));
+ } else {
+ return Optional.absent();
+ }
+ }
+
+ @Override public int getServingPort() {
+ return THRIFT_PORT.get();
+ }
+ });
+ bind(ExecutorConfig.class).toInstance(new ExecutorConfig(THERMOS_EXECUTOR_PATH.get()));
+ }
+ };
+
+ return ImmutableList.<Module>builder()
+ .add(new BackupModule(SnapshotStoreImpl.class))
+ .addAll(getModules(CLUSTER_NAME.get(), SERVERSET_PATH.get(), zkClientConfig))
+ .add(new ZooKeeperClientModule(zkClientConfig))
+ .add(configModule)
+ .add(additional)
+ .build();
+ }
+
+ @Override
+ public void run() {
+ if (glogConfig.isPresent()) {
+ // Setup log4j to match our jul glog config in order to pick up zookeeper logging.
+ Log4jConfigurator.configureConsole(glogConfig.get());
+ } else {
+ LOG.warning("Running without expected glog configuration.");
+ }
+
+ LeadershipListener leaderListener = schedulerLifecycle.prepare();
+
+ Optional<InetSocketAddress> primarySocket = serviceRegistry.getPrimarySocket();
+ if (!primarySocket.isPresent()) {
+ throw new IllegalStateException("No primary service registered with LocalServiceRegistry.");
+ }
+
+ try {
+ schedulerService.lead(
+ primarySocket.get(),
+ serviceRegistry.getAuxiliarySockets(),
+ leaderListener);
+ } catch (Group.WatchException e) {
+ throw new IllegalStateException("Failed to watch group and lead service.", e);
+ } catch (Group.JoinException e) {
+ throw new IllegalStateException("Failed to join scheduler service group.", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Interrupted while joining scheduler service group.", e);
+ }
+
+ appLifecycle.awaitShutdown();
+ }
+
+ public static void main(String[] args) {
+ AppLauncher.launch(SchedulerMain.class, args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
new file mode 100644
index 0000000..faf3269
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -0,0 +1,274 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.logging.Logger;
+
+import javax.inject.Singleton;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.BindingAnnotation;
+import com.google.inject.Key;
+import com.google.inject.PrivateModule;
+import com.google.inject.TypeLiteral;
+
+import com.twitter.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
+import com.twitter.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
+import com.twitter.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
+import com.twitter.aurora.scheduler.async.TaskGroups.TaskGroupsSettings;
+import com.twitter.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
+import com.twitter.aurora.scheduler.events.PubsubEventModule;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatImpl;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.util.Random;
+import com.twitter.common.util.TruncatedBinaryBackoff;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import static com.twitter.aurora.scheduler.async.HistoryPruner.PruneThreshold;
+import static com.twitter.aurora.scheduler.async.Preemptor.PreemptorImpl;
+import static com.twitter.aurora.scheduler.async.Preemptor.PreemptorImpl.PreemptionDelay;
+import static com.twitter.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl.ReservationDuration;
+
+/**
+ * Binding module for async task management.
+ */
+public class AsyncModule extends AbstractModule {
+
+ private static final Logger LOG = Logger.getLogger(AsyncModule.class.getName());
+
+ @CmdLine(name = "async_worker_threads",
+ help = "The number of worker threads to process async task operations with.")
+ private static final Arg<Integer> ASYNC_WORKER_THREADS = Arg.create(1);
+
+ @CmdLine(name = "transient_task_state_timeout",
+ help = "The amount of time after which to treat a task stuck in a transient state as LOST.")
+ private static final Arg<Amount<Long, Time>> TRANSIENT_TASK_STATE_TIMEOUT =
+ Arg.create(Amount.of(5L, Time.MINUTES));
+
+ @CmdLine(name = "initial_schedule_delay",
+ help = "Initial amount of time to wait before attempting to schedule a PENDING task.")
+ private static final Arg<Amount<Long, Time>> INITIAL_SCHEDULE_DELAY =
+ Arg.create(Amount.of(1L, Time.SECONDS));
+
+ @CmdLine(name = "max_schedule_delay",
+ help = "Maximum delay between attempts to schedule a PENDING tasks.")
+ private static final Arg<Amount<Long, Time>> MAX_SCHEDULE_DELAY =
+ Arg.create(Amount.of(30L, Time.SECONDS));
+
+ @CmdLine(name = "min_offer_hold_time",
+ help = "Minimum amount of time to hold a resource offer before declining.")
+ private static final Arg<Amount<Integer, Time>> MIN_OFFER_HOLD_TIME =
+ Arg.create(Amount.of(5, Time.MINUTES));
+
+ @CmdLine(name = "history_prune_threshold",
+ help = "Time after which the scheduler will prune terminated task history.")
+ private static final Arg<Amount<Long, Time>> HISTORY_PRUNE_THRESHOLD =
+ Arg.create(Amount.of(2L, Time.DAYS));
+
+ @CmdLine(name = "max_schedule_attempts_per_sec",
+ help = "Maximum number of scheduling attempts to make per second.")
+ private static final Arg<Double> MAX_SCHEDULE_ATTEMPTS_PER_SEC = Arg.create(10D);
+
+ @CmdLine(name = "flapping_task_threshold",
+ help = "A task that repeatedly runs for less than this time is considered to be flapping.")
+ private static final Arg<Amount<Long, Time>> FLAPPING_THRESHOLD =
+ Arg.create(Amount.of(5L, Time.MINUTES));
+
+ @CmdLine(name = "initial_flapping_task_delay",
+ help = "Initial amount of time to wait before attempting to schedule a flapping task.")
+ private static final Arg<Amount<Long, Time>> INITIAL_FLAPPING_DELAY =
+ Arg.create(Amount.of(30L, Time.SECONDS));
+
+ @CmdLine(name = "max_flapping_task_delay",
+ help = "Maximum delay between attempts to schedule a flapping task.")
+ private static final Arg<Amount<Long, Time>> MAX_FLAPPING_DELAY =
+ Arg.create(Amount.of(5L, Time.MINUTES));
+
+ @CmdLine(name = "max_reschedule_task_delay_on_startup",
+ help = "Upper bound of random delay for pending task rescheduling on scheduler startup.")
+ private static final Arg<Amount<Integer, Time>> MAX_RESCHEDULING_DELAY =
+ Arg.create(Amount.of(30, Time.SECONDS));
+
+ @CmdLine(name = "preemption_delay",
+ help = "Time interval after which a pending task becomes eligible to preempt other tasks")
+ private static final Arg<Amount<Long, Time>> PREEMPTION_DELAY =
+ Arg.create(Amount.of(10L, Time.MINUTES));
+
+ @CmdLine(name = "enable_preemptor",
+ help = "Enable the preemptor and preemption")
+ private static final Arg<Boolean> ENABLE_PREEMPTOR = Arg.create(true);
+
+ private static final Preemptor NULL_PREEMPTOR = new Preemptor() {
+ @Override public Optional<String> findPreemptionSlotFor(String taskId) {
+ return Optional.absent();
+ }
+ };
+
+ @CmdLine(name = "offer_reservation_duration", help = "Time to reserve a slave's offers while "
+ + "trying to satisfy a task preempting another.")
+ private static final Arg<Amount<Long, Time>> RESERVATION_DURATION =
+ Arg.create(Amount.of(3L, Time.MINUTES));
+
+ @BindingAnnotation
+ @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+ private @interface PreemptionBinding { }
+
+ @VisibleForTesting
+ static final Key<Preemptor> PREEMPTOR_KEY = Key.get(Preemptor.class, PreemptionBinding.class);
+
+ @Override
+ protected void configure() {
+ // Don't worry about clean shutdown, these can be daemon and cleanup-free.
+ final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
+ ASYNC_WORKER_THREADS.get(),
+ new ThreadFactoryBuilder().setNameFormat("AsyncProcessor-%d").setDaemon(true).build());
+ Stats.exportSize("timeout_queue_size", executor.getQueue());
+ Stats.export(new StatImpl<Long>("async_tasks_completed") {
+ @Override public Long read() {
+ return executor.getCompletedTaskCount();
+ }
+ });
+
+ // AsyncModule itself is not a subclass of PrivateModule because TaskEventModule internally uses
+ // a MultiBinder, which cannot span multiple injectors.
+ binder().install(new PrivateModule() {
+ @Override protected void configure() {
+ bind(new TypeLiteral<Amount<Long, Time>>() { })
+ .toInstance(TRANSIENT_TASK_STATE_TIMEOUT.get());
+ bind(ScheduledExecutorService.class).toInstance(executor);
+
+ bind(TaskTimeout.class).in(Singleton.class);
+ requireBinding(StatsProvider.class);
+ expose(TaskTimeout.class);
+ }
+ });
+ PubsubEventModule.bindSubscriber(binder(), TaskTimeout.class);
+
+ binder().install(new PrivateModule() {
+ @Override protected void configure() {
+ bind(TaskGroupsSettings.class).toInstance(new TaskGroupsSettings(
+ new TruncatedBinaryBackoff(INITIAL_SCHEDULE_DELAY.get(), MAX_SCHEDULE_DELAY.get()),
+ RateLimiter.create(MAX_SCHEDULE_ATTEMPTS_PER_SEC.get())));
+
+ bind(RescheduleCalculatorImpl.RescheduleCalculatorSettings.class)
+ .toInstance(new RescheduleCalculatorImpl.RescheduleCalculatorSettings(
+ new TruncatedBinaryBackoff(INITIAL_FLAPPING_DELAY.get(), MAX_FLAPPING_DELAY.get()),
+ FLAPPING_THRESHOLD.get(),
+ MAX_RESCHEDULING_DELAY.get()));
+
+ bind(RescheduleCalculator.class).to(RescheduleCalculatorImpl.class).in(Singleton.class);
+ if (ENABLE_PREEMPTOR.get()) {
+ bind(PREEMPTOR_KEY).to(PreemptorImpl.class);
+ bind(PreemptorImpl.class).in(Singleton.class);
+ LOG.info("Preemptor Enabled.");
+ } else {
+ bind(PREEMPTOR_KEY).toInstance(NULL_PREEMPTOR);
+ LOG.warning("Preemptor Disabled.");
+ }
+ expose(PREEMPTOR_KEY);
+ bind(new TypeLiteral<Amount<Long, Time>>() {
+ }).annotatedWith(PreemptionDelay.class)
+ .toInstance(PREEMPTION_DELAY.get());
+ bind(TaskGroups.class).in(Singleton.class);
+ expose(TaskGroups.class);
+ }
+ });
+ bindTaskScheduler(binder(), PREEMPTOR_KEY, RESERVATION_DURATION.get());
+ PubsubEventModule.bindSubscriber(binder(), TaskGroups.class);
+
+ binder().install(new PrivateModule() {
+ @Override protected void configure() {
+ bind(OfferReturnDelay.class).to(RandomJitterReturnDelay.class);
+ bind(ScheduledExecutorService.class).toInstance(executor);
+ bind(OfferQueue.class).to(OfferQueueImpl.class);
+ bind(OfferQueueImpl.class).in(Singleton.class);
+ expose(OfferQueue.class);
+ }
+ });
+ PubsubEventModule.bindSubscriber(binder(), OfferQueue.class);
+
+ binder().install(new PrivateModule() {
+ @Override protected void configure() {
+ // TODO(ksweeney): Create a configuration validator module so this can be injected.
+ // TODO(William Farner): Revert this once large task counts is cheap ala hierarchichal store
+ bind(Integer.class).annotatedWith(PruneThreshold.class).toInstance(100);
+ bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(PruneThreshold.class)
+ .toInstance(HISTORY_PRUNE_THRESHOLD.get());
+ bind(ScheduledExecutorService.class).toInstance(executor);
+
+ bind(HistoryPruner.class).in(Singleton.class);
+ expose(HistoryPruner.class);
+ }
+ });
+ PubsubEventModule.bindSubscriber(binder(), HistoryPruner.class);
+ }
+
+ /**
+ * This method exists because we want to test the wiring up of TaskSchedulerImpl class to the
+ * PubSub system in the TaskSchedulerImplTest class. The method has a complex signature because
+ * the binding of the TaskScheduler and friends occurs in a PrivateModule which does not interact
+ * well with the MultiBinder that backs the PubSub system.
+ */
+ @VisibleForTesting
+ static void bindTaskScheduler(
+ Binder binder,
+ final Key<Preemptor> preemptorKey,
+ final Amount<Long, Time> reservationDuration) {
+ binder.install(new PrivateModule() {
+ @Override protected void configure() {
+ bind(Preemptor.class).to(preemptorKey);
+ bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(ReservationDuration.class)
+ .toInstance(reservationDuration);
+ bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
+ bind(TaskSchedulerImpl.class).in(Singleton.class);
+ expose(TaskScheduler.class);
+ }
+ });
+ PubsubEventModule.bindSubscriber(binder, TaskScheduler.class);
+ }
+
+ /**
+ * Returns offers after a random duration within a fixed window.
+ */
+ private static class RandomJitterReturnDelay implements OfferReturnDelay {
+ private static final int JITTER_WINDOW_MS = Amount.of(1, Time.MINUTES).as(Time.MILLISECONDS);
+
+ private final int minHoldTimeMs = MIN_OFFER_HOLD_TIME.get().as(Time.MILLISECONDS);
+ private final Random random = new Random.SystemRandom(new java.util.Random());
+
+ @Override public Amount<Integer, Time> get() {
+ return Amount.of(minHoldTimeMs + random.nextInt(JITTER_WINDOW_MS), Time.MILLISECONDS);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
new file mode 100644
index 0000000..9af6d36
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.async;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.BindingAnnotation;
+
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.state.StateManager;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.scheduler.base.Tasks.LATEST_ACTIVITY;
+import static com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import static com.twitter.aurora.scheduler.events.PubsubEvent.StorageStarted;
+import static com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import static com.twitter.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+
+/**
+ * Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks
+ * transitioning into one of the inactive states.
+ */
+public class HistoryPruner implements EventSubscriber {
+ private static final Logger LOG = Logger.getLogger(HistoryPruner.class.getName());
+
+ @VisibleForTesting
+ static final Query.Builder INACTIVE_QUERY = Query.unscoped().terminal();
+
+ private final Multimap<IJobKey, String> tasksByJob =
+ Multimaps.synchronizedSetMultimap(LinkedHashMultimap.<IJobKey, String>create());
+ @VisibleForTesting
+ Multimap<IJobKey, String> getTasksByJob() {
+ return tasksByJob;
+ }
+
+ private final ScheduledExecutorService executor;
+ private final Storage storage;
+ private final StateManager stateManager;
+ private final Clock clock;
+ private final long pruneThresholdMillis;
+ private final int perJobHistoryGoal;
+ private final Map<String, Future<?>> taskIdToFuture = Maps.newConcurrentMap();
+
+ @BindingAnnotation
+ @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+ public @interface PruneThreshold { }
+
+ @Inject
+ HistoryPruner(
+ final ScheduledExecutorService executor,
+ final Storage storage,
+ final StateManager stateManager,
+ final Clock clock,
+ @PruneThreshold Amount<Long, Time> inactivePruneThreshold,
+ @PruneThreshold int perJobHistoryGoal) {
+
+ this.executor = checkNotNull(executor);
+ this.storage = checkNotNull(storage);
+ this.stateManager = checkNotNull(stateManager);
+ this.clock = checkNotNull(clock);
+ this.pruneThresholdMillis = inactivePruneThreshold.as(Time.MILLISECONDS);
+ this.perJobHistoryGoal = perJobHistoryGoal;
+ }
+
+ @VisibleForTesting
+ long calculateTimeout(long taskEventTimestampMillis) {
+ return pruneThresholdMillis - Math.max(0, clock.nowMillis() - taskEventTimestampMillis);
+ }
+
+ /**
+ * When triggered, records an inactive task state change.
+ *
+ * @param change Event when a task changes state.
+ */
+ @Subscribe
+ public void recordStateChange(TaskStateChange change) {
+ if (Tasks.isTerminated(change.getNewState())) {
+ registerInactiveTask(
+ Tasks.SCHEDULED_TO_JOB_KEY.apply(change.getTask()),
+ change.getTaskId(),
+ calculateTimeout(clock.nowMillis()));
+ }
+ }
+
+ /**
+ * When triggered, iterates through inactive tasks in the system and prunes tasks that
+ * exceed the history goal for a job or are beyond the time threshold.
+ *
+ * @param event A new StorageStarted event.
+ */
+ @Subscribe
+ public void storageStarted(StorageStarted event) {
+ for (IScheduledTask task
+ : LATEST_ACTIVITY.sortedCopy(Storage.Util.consistentFetchTasks(storage, INACTIVE_QUERY))) {
+
+ registerInactiveTask(
+ Tasks.SCHEDULED_TO_JOB_KEY.apply(task),
+ Tasks.id(task),
+ calculateTimeout(Iterables.getLast(task.getTaskEvents()).getTimestamp()));
+ }
+ }
+
+ private void deleteTasks(Set<String> taskIds) {
+ LOG.info("Pruning inactive tasks " + taskIds);
+ stateManager.deleteTasks(taskIds);
+ }
+
+ /**
+ * When triggered, removes the tasks scheduled for pruning and cancels any existing future.
+ *
+ * @param event A new TasksDeleted event.
+ */
+ @Subscribe
+ public void tasksDeleted(final TasksDeleted event) {
+ for (IScheduledTask task : event.getTasks()) {
+ String id = Tasks.id(task);
+ tasksByJob.remove(Tasks.SCHEDULED_TO_JOB_KEY.apply(task), id);
+ Future<?> future = taskIdToFuture.remove(id);
+ if (future != null) {
+ future.cancel(false);
+ }
+ }
+ }
+
+ private void registerInactiveTask(
+ final IJobKey jobKey,
+ final String taskId,
+ long timeRemaining) {
+
+ LOG.fine("Prune task " + taskId + " in " + timeRemaining + " ms.");
+ // Insert the latest inactive task at the tail.
+ tasksByJob.put(jobKey, taskId);
+ Runnable runnable = new Runnable() {
+ @Override public void run() {
+ LOG.info("Pruning expired inactive task " + taskId);
+ tasksByJob.remove(jobKey, taskId);
+ taskIdToFuture.remove(taskId);
+ deleteTasks(ImmutableSet.of(taskId));
+ }
+ };
+ taskIdToFuture.put(taskId, executor.schedule(runnable, timeRemaining, TimeUnit.MILLISECONDS));
+
+ ImmutableSet.Builder<String> pruneTaskIds = ImmutableSet.builder();
+ Collection<String> tasks = tasksByJob.get(jobKey);
+ // From Multimaps javadoc: "It is imperative that the user manually synchronize on the returned
+ // multimap when accessing any of its collection views".
+ synchronized (tasksByJob) {
+ Iterator<String> iterator = tasks.iterator();
+ while (tasks.size() > perJobHistoryGoal) {
+ // Pick oldest task from the head. Guaranteed by LinkedHashMultimap based on insertion
+ // order.
+ String id = iterator.next();
+ iterator.remove();
+ pruneTaskIds.add(id);
+ Future<?> future = taskIdToFuture.remove(id);
+ if (future != null) {
+ future.cancel(false);
+ }
+ }
+ }
+
+ Set<String> ids = pruneTaskIds.build();
+ if (!ids.isEmpty()) {
+ deleteTasks(ids);
+ }
+ }
+}