You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:42 UTC
[49/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/app/Modules.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/app/Modules.java b/src/main/java/com/twitter/aurora/scheduler/app/Modules.java
deleted file mode 100644
index 72a80e1..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/app/Modules.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.app;
-
-import com.google.inject.Module;
-import com.google.inject.PrivateModule;
-
-/**
- * A utility class for managing guice modules.
- */
-final class Modules {
-
- private Modules() {
- // Utility class
- }
-
- private static Module instantiateModule(final Class<? extends Module> moduleClass) {
- try {
- return moduleClass.newInstance();
- } catch (InstantiationException e) {
- throw new IllegalArgumentException(
- String.format(
- "Failed to instantiate module %s. Are you sure it has a no-arg constructor?",
- moduleClass.getName()),
- e);
- } catch (IllegalAccessException e) {
- throw new IllegalArgumentException(
- String.format(
- "Failed to instantiate module %s. Are you sure it's public?",
- moduleClass.getName()),
- e);
- }
- }
-
- // Defensively wrap each module provided on the command-line in a PrivateModule that only
- // exposes requested classes to ensure that we don't depend on surprise extra bindings across
- // different implementations.
- static Module wrapInPrivateModule(
- Class<? extends Module> moduleClass,
- final Iterable<Class<?>> exposedClasses) {
-
- final Module module = instantiateModule(moduleClass);
- return new PrivateModule() {
- @Override protected void configure() {
- install(module);
- for (Class<?> klass : exposedClasses) {
- expose(klass);
- }
- }
- };
- }
-
- static Module getModule(Class<? extends Module> moduleClass) {
- return instantiateModule(moduleClass);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/app/SchedulerMain.java b/src/main/java/com/twitter/aurora/scheduler/app/SchedulerMain.java
deleted file mode 100644
index 693c364..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/app/SchedulerMain.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.app;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.logging.Logger;
-
-import javax.annotation.Nonnegative;
-import javax.inject.Inject;
-import javax.inject.Singleton;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableList.Builder;
-import com.google.inject.AbstractModule;
-import com.google.inject.Module;
-
-import com.twitter.aurora.auth.CapabilityValidator;
-import com.twitter.aurora.auth.SessionValidator;
-import com.twitter.aurora.auth.UnsecureAuthModule;
-import com.twitter.aurora.scheduler.DriverFactory;
-import com.twitter.aurora.scheduler.DriverFactory.DriverFactoryImpl;
-import com.twitter.aurora.scheduler.MesosTaskFactory.ExecutorConfig;
-import com.twitter.aurora.scheduler.SchedulerLifecycle;
-import com.twitter.aurora.scheduler.cron.CronPredictor;
-import com.twitter.aurora.scheduler.cron.CronScheduler;
-import com.twitter.aurora.scheduler.cron.noop.NoopCronModule;
-import com.twitter.aurora.scheduler.local.IsolatedSchedulerModule;
-import com.twitter.aurora.scheduler.log.mesos.MesosLogStreamModule;
-import com.twitter.aurora.scheduler.storage.backup.BackupModule;
-import com.twitter.aurora.scheduler.storage.log.LogStorage;
-import com.twitter.aurora.scheduler.storage.log.LogStorageModule;
-import com.twitter.aurora.scheduler.storage.log.SnapshotStoreImpl;
-import com.twitter.aurora.scheduler.storage.mem.MemStorageModule;
-import com.twitter.aurora.scheduler.thrift.ThriftConfiguration;
-import com.twitter.aurora.scheduler.thrift.ThriftModule;
-import com.twitter.aurora.scheduler.thrift.auth.ThriftAuthModule;
-import com.twitter.common.application.AbstractApplication;
-import com.twitter.common.application.AppLauncher;
-import com.twitter.common.application.Lifecycle;
-import com.twitter.common.application.modules.HttpModule;
-import com.twitter.common.application.modules.LocalServiceRegistry;
-import com.twitter.common.application.modules.LogModule;
-import com.twitter.common.application.modules.StatsModule;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.args.constraints.NotEmpty;
-import com.twitter.common.args.constraints.NotNull;
-import com.twitter.common.inject.Bindings;
-import com.twitter.common.logging.RootLogConfig;
-import com.twitter.common.zookeeper.Group;
-import com.twitter.common.zookeeper.SingletonService;
-import com.twitter.common.zookeeper.SingletonService.LeadershipListener;
-import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule;
-import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule.ClientConfig;
-import com.twitter.common.zookeeper.guice.client.flagged.FlaggedClientConfig;
-
-/**
- * Launcher for the aurora scheduler.
- */
-public class SchedulerMain extends AbstractApplication {
-
- private static final Logger LOG = Logger.getLogger(SchedulerMain.class.getName());
-
- @CmdLine(name = "testing_isolated_scheduler",
- help = "If true, run in a testing mode with the scheduler isolated from other components.")
- private static final Arg<Boolean> ISOLATED_SCHEDULER = Arg.create(false);
-
- @NotNull
- @CmdLine(name = "cluster_name", help = "Name to identify the cluster being served.")
- private static final Arg<String> CLUSTER_NAME = Arg.create();
-
- @NotNull
- @NotEmpty
- @CmdLine(name = "serverset_path", help = "ZooKeeper ServerSet path to register at.")
- private static final Arg<String> SERVERSET_PATH = Arg.create();
-
- @CmdLine(name = "mesos_ssl_keyfile",
- help = "JKS keyfile for operating the Mesos Thrift-over-SSL interface.")
- private static final Arg<File> MESOS_SSL_KEY_FILE = Arg.create();
-
- @Nonnegative
- @CmdLine(name = "thrift_port", help = "Thrift server port.")
- private static final Arg<Integer> THRIFT_PORT = Arg.create(0);
-
- @NotNull
- @CmdLine(name = "thermos_executor_path", help = "Path to the thermos executor launch script.")
- private static final Arg<String> THERMOS_EXECUTOR_PATH = Arg.create();
-
- @CmdLine(name = "auth_module",
- help = "A Guice module to provide auth bindings. NOTE: The default is unsecure.")
- private static final Arg<? extends Class<? extends Module>> AUTH_MODULE =
- Arg.create(UnsecureAuthModule.class);
-
- private static final Iterable<Class<?>> AUTH_MODULE_CLASSES = ImmutableList.<Class<?>>builder()
- .add(SessionValidator.class)
- .add(CapabilityValidator.class)
- .build();
-
- @CmdLine(name = "cron_module",
- help = "A Guice module to provide cron bindings. NOTE: The default is a no-op.")
- private static final Arg<? extends Class<? extends Module>> CRON_MODULE =
- Arg.create(NoopCronModule.class);
-
- private static final Iterable<Class<?>> CRON_MODULE_CLASSES = ImmutableList.<Class<?>>builder()
- .add(CronPredictor.class)
- .add(CronScheduler.class)
- .build();
-
- // TODO(Suman Karumuri): Pass in AUTH and CRON modules as extra modules
- @CmdLine(name = "extra_modules",
- help = "A list of modules that provide additional functionality.")
- private static final Arg<List<Class<? extends Module>>> EXTRA_MODULES =
- Arg.create((List<Class<? extends Module>>) ImmutableList.<Class<? extends Module>>of());
-
- @Inject private SingletonService schedulerService;
- @Inject private LocalServiceRegistry serviceRegistry;
- @Inject private SchedulerLifecycle schedulerLifecycle;
- @Inject private Lifecycle appLifecycle;
- @Inject private Optional<RootLogConfig.Configuration> glogConfig;
-
- private static Iterable<? extends Module> getSystemModules() {
- return ImmutableList.of(
- new LogModule(),
- new HttpModule(),
- new StatsModule()
- );
- }
-
- private static Iterable<? extends Module> getExtraModules() {
- Builder<Module> modules = ImmutableList.builder();
- modules.add(Modules.wrapInPrivateModule(AUTH_MODULE.get(), AUTH_MODULE_CLASSES))
- .add(Modules.wrapInPrivateModule(CRON_MODULE.get(), CRON_MODULE_CLASSES));
-
- for (Class<? extends Module> moduleClass : EXTRA_MODULES.get()) {
- modules.add(Modules.getModule(moduleClass));
- }
-
- return modules.build();
- }
-
- static Iterable<? extends Module> getModules(
- String clusterName,
- String serverSetPath,
- ClientConfig zkClientConfig) {
-
- return ImmutableList.<Module>builder()
- .addAll(getSystemModules())
- .add(new AppModule(clusterName, serverSetPath, zkClientConfig))
- .addAll(getExtraModules())
- .add(new LogStorageModule())
- .add(new MemStorageModule(Bindings.annotatedKeyFactory(LogStorage.WriteBehind.class)))
- .add(new ThriftModule())
- .add(new ThriftAuthModule())
- .build();
- }
-
- @Override
- public Iterable<? extends Module> getModules() {
- Module additional;
- final ClientConfig zkClientConfig = FlaggedClientConfig.create();
- if (ISOLATED_SCHEDULER.get()) {
- additional = new IsolatedSchedulerModule();
- } else {
- // TODO(Kevin Sweeney): Push these bindings down into a "production" module.
- additional = new AbstractModule() {
- @Override protected void configure() {
- bind(DriverFactory.class).to(DriverFactoryImpl.class);
- bind(DriverFactoryImpl.class).in(Singleton.class);
- install(new MesosLogStreamModule(zkClientConfig));
- }
- };
- }
-
- Module configModule = new AbstractModule() {
- @Override protected void configure() {
- bind(ThriftConfiguration.class).toInstance(new ThriftConfiguration() {
- @Override public Optional<InputStream> getSslKeyStream() throws FileNotFoundException {
- if (MESOS_SSL_KEY_FILE.hasAppliedValue()) {
- return Optional.<InputStream>of(new FileInputStream(MESOS_SSL_KEY_FILE.get()));
- } else {
- return Optional.absent();
- }
- }
-
- @Override public int getServingPort() {
- return THRIFT_PORT.get();
- }
- });
- bind(ExecutorConfig.class).toInstance(new ExecutorConfig(THERMOS_EXECUTOR_PATH.get()));
- }
- };
-
- return ImmutableList.<Module>builder()
- .add(new BackupModule(SnapshotStoreImpl.class))
- .addAll(getModules(CLUSTER_NAME.get(), SERVERSET_PATH.get(), zkClientConfig))
- .add(new ZooKeeperClientModule(zkClientConfig))
- .add(configModule)
- .add(additional)
- .build();
- }
-
- @Override
- public void run() {
- if (glogConfig.isPresent()) {
- // Setup log4j to match our jul glog config in order to pick up zookeeper logging.
- Log4jConfigurator.configureConsole(glogConfig.get());
- } else {
- LOG.warning("Running without expected glog configuration.");
- }
-
- LeadershipListener leaderListener = schedulerLifecycle.prepare();
-
- Optional<InetSocketAddress> primarySocket = serviceRegistry.getPrimarySocket();
- if (!primarySocket.isPresent()) {
- throw new IllegalStateException("No primary service registered with LocalServiceRegistry.");
- }
-
- try {
- schedulerService.lead(
- primarySocket.get(),
- serviceRegistry.getAuxiliarySockets(),
- leaderListener);
- } catch (Group.WatchException e) {
- throw new IllegalStateException("Failed to watch group and lead service.", e);
- } catch (Group.JoinException e) {
- throw new IllegalStateException("Failed to join scheduler service group.", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException("Interrupted while joining scheduler service group.", e);
- }
-
- appLifecycle.awaitShutdown();
- }
-
- public static void main(String[] args) {
- AppLauncher.launch(SchedulerMain.class, args);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java b/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java
deleted file mode 100644
index faf3269..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.async;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.logging.Logger;
-
-import javax.inject.Singleton;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.RateLimiter;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.BindingAnnotation;
-import com.google.inject.Key;
-import com.google.inject.PrivateModule;
-import com.google.inject.TypeLiteral;
-
-import com.twitter.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
-import com.twitter.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
-import com.twitter.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
-import com.twitter.aurora.scheduler.async.TaskGroups.TaskGroupsSettings;
-import com.twitter.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
-import com.twitter.aurora.scheduler.events.PubsubEventModule;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.StatImpl;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.util.Random;
-import com.twitter.common.util.TruncatedBinaryBackoff;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import static com.twitter.aurora.scheduler.async.HistoryPruner.PruneThreshold;
-import static com.twitter.aurora.scheduler.async.Preemptor.PreemptorImpl;
-import static com.twitter.aurora.scheduler.async.Preemptor.PreemptorImpl.PreemptionDelay;
-import static com.twitter.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl.ReservationDuration;
-
-/**
- * Binding module for async task management.
- */
-public class AsyncModule extends AbstractModule {
-
- private static final Logger LOG = Logger.getLogger(AsyncModule.class.getName());
-
- @CmdLine(name = "async_worker_threads",
- help = "The number of worker threads to process async task operations with.")
- private static final Arg<Integer> ASYNC_WORKER_THREADS = Arg.create(1);
-
- @CmdLine(name = "transient_task_state_timeout",
- help = "The amount of time after which to treat a task stuck in a transient state as LOST.")
- private static final Arg<Amount<Long, Time>> TRANSIENT_TASK_STATE_TIMEOUT =
- Arg.create(Amount.of(5L, Time.MINUTES));
-
- @CmdLine(name = "initial_schedule_delay",
- help = "Initial amount of time to wait before attempting to schedule a PENDING task.")
- private static final Arg<Amount<Long, Time>> INITIAL_SCHEDULE_DELAY =
- Arg.create(Amount.of(1L, Time.SECONDS));
-
- @CmdLine(name = "max_schedule_delay",
- help = "Maximum delay between attempts to schedule a PENDING tasks.")
- private static final Arg<Amount<Long, Time>> MAX_SCHEDULE_DELAY =
- Arg.create(Amount.of(30L, Time.SECONDS));
-
- @CmdLine(name = "min_offer_hold_time",
- help = "Minimum amount of time to hold a resource offer before declining.")
- private static final Arg<Amount<Integer, Time>> MIN_OFFER_HOLD_TIME =
- Arg.create(Amount.of(5, Time.MINUTES));
-
- @CmdLine(name = "history_prune_threshold",
- help = "Time after which the scheduler will prune terminated task history.")
- private static final Arg<Amount<Long, Time>> HISTORY_PRUNE_THRESHOLD =
- Arg.create(Amount.of(2L, Time.DAYS));
-
- @CmdLine(name = "max_schedule_attempts_per_sec",
- help = "Maximum number of scheduling attempts to make per second.")
- private static final Arg<Double> MAX_SCHEDULE_ATTEMPTS_PER_SEC = Arg.create(10D);
-
- @CmdLine(name = "flapping_task_threshold",
- help = "A task that repeatedly runs for less than this time is considered to be flapping.")
- private static final Arg<Amount<Long, Time>> FLAPPING_THRESHOLD =
- Arg.create(Amount.of(5L, Time.MINUTES));
-
- @CmdLine(name = "initial_flapping_task_delay",
- help = "Initial amount of time to wait before attempting to schedule a flapping task.")
- private static final Arg<Amount<Long, Time>> INITIAL_FLAPPING_DELAY =
- Arg.create(Amount.of(30L, Time.SECONDS));
-
- @CmdLine(name = "max_flapping_task_delay",
- help = "Maximum delay between attempts to schedule a flapping task.")
- private static final Arg<Amount<Long, Time>> MAX_FLAPPING_DELAY =
- Arg.create(Amount.of(5L, Time.MINUTES));
-
- @CmdLine(name = "max_reschedule_task_delay_on_startup",
- help = "Upper bound of random delay for pending task rescheduling on scheduler startup.")
- private static final Arg<Amount<Integer, Time>> MAX_RESCHEDULING_DELAY =
- Arg.create(Amount.of(30, Time.SECONDS));
-
- @CmdLine(name = "preemption_delay",
- help = "Time interval after which a pending task becomes eligible to preempt other tasks")
- private static final Arg<Amount<Long, Time>> PREEMPTION_DELAY =
- Arg.create(Amount.of(10L, Time.MINUTES));
-
- @CmdLine(name = "enable_preemptor",
- help = "Enable the preemptor and preemption")
- private static final Arg<Boolean> ENABLE_PREEMPTOR = Arg.create(true);
-
- private static final Preemptor NULL_PREEMPTOR = new Preemptor() {
- @Override public Optional<String> findPreemptionSlotFor(String taskId) {
- return Optional.absent();
- }
- };
-
- @CmdLine(name = "offer_reservation_duration", help = "Time to reserve a slave's offers while "
- + "trying to satisfy a task preempting another.")
- private static final Arg<Amount<Long, Time>> RESERVATION_DURATION =
- Arg.create(Amount.of(3L, Time.MINUTES));
-
- @BindingAnnotation
- @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
- private @interface PreemptionBinding { }
-
- @VisibleForTesting
- static final Key<Preemptor> PREEMPTOR_KEY = Key.get(Preemptor.class, PreemptionBinding.class);
-
- @Override
- protected void configure() {
- // Don't worry about clean shutdown, these can be daemon and cleanup-free.
- final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
- ASYNC_WORKER_THREADS.get(),
- new ThreadFactoryBuilder().setNameFormat("AsyncProcessor-%d").setDaemon(true).build());
- Stats.exportSize("timeout_queue_size", executor.getQueue());
- Stats.export(new StatImpl<Long>("async_tasks_completed") {
- @Override public Long read() {
- return executor.getCompletedTaskCount();
- }
- });
-
- // AsyncModule itself is not a subclass of PrivateModule because TaskEventModule internally uses
- // a MultiBinder, which cannot span multiple injectors.
- binder().install(new PrivateModule() {
- @Override protected void configure() {
- bind(new TypeLiteral<Amount<Long, Time>>() { })
- .toInstance(TRANSIENT_TASK_STATE_TIMEOUT.get());
- bind(ScheduledExecutorService.class).toInstance(executor);
-
- bind(TaskTimeout.class).in(Singleton.class);
- requireBinding(StatsProvider.class);
- expose(TaskTimeout.class);
- }
- });
- PubsubEventModule.bindSubscriber(binder(), TaskTimeout.class);
-
- binder().install(new PrivateModule() {
- @Override protected void configure() {
- bind(TaskGroupsSettings.class).toInstance(new TaskGroupsSettings(
- new TruncatedBinaryBackoff(INITIAL_SCHEDULE_DELAY.get(), MAX_SCHEDULE_DELAY.get()),
- RateLimiter.create(MAX_SCHEDULE_ATTEMPTS_PER_SEC.get())));
-
- bind(RescheduleCalculatorImpl.RescheduleCalculatorSettings.class)
- .toInstance(new RescheduleCalculatorImpl.RescheduleCalculatorSettings(
- new TruncatedBinaryBackoff(INITIAL_FLAPPING_DELAY.get(), MAX_FLAPPING_DELAY.get()),
- FLAPPING_THRESHOLD.get(),
- MAX_RESCHEDULING_DELAY.get()));
-
- bind(RescheduleCalculator.class).to(RescheduleCalculatorImpl.class).in(Singleton.class);
- if (ENABLE_PREEMPTOR.get()) {
- bind(PREEMPTOR_KEY).to(PreemptorImpl.class);
- bind(PreemptorImpl.class).in(Singleton.class);
- LOG.info("Preemptor Enabled.");
- } else {
- bind(PREEMPTOR_KEY).toInstance(NULL_PREEMPTOR);
- LOG.warning("Preemptor Disabled.");
- }
- expose(PREEMPTOR_KEY);
- bind(new TypeLiteral<Amount<Long, Time>>() {
- }).annotatedWith(PreemptionDelay.class)
- .toInstance(PREEMPTION_DELAY.get());
- bind(TaskGroups.class).in(Singleton.class);
- expose(TaskGroups.class);
- }
- });
- bindTaskScheduler(binder(), PREEMPTOR_KEY, RESERVATION_DURATION.get());
- PubsubEventModule.bindSubscriber(binder(), TaskGroups.class);
-
- binder().install(new PrivateModule() {
- @Override protected void configure() {
- bind(OfferReturnDelay.class).to(RandomJitterReturnDelay.class);
- bind(ScheduledExecutorService.class).toInstance(executor);
- bind(OfferQueue.class).to(OfferQueueImpl.class);
- bind(OfferQueueImpl.class).in(Singleton.class);
- expose(OfferQueue.class);
- }
- });
- PubsubEventModule.bindSubscriber(binder(), OfferQueue.class);
-
- binder().install(new PrivateModule() {
- @Override protected void configure() {
- // TODO(ksweeney): Create a configuration validator module so this can be injected.
- // TODO(William Farner): Revert this once large task counts is cheap ala hierarchichal store
- bind(Integer.class).annotatedWith(PruneThreshold.class).toInstance(100);
- bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(PruneThreshold.class)
- .toInstance(HISTORY_PRUNE_THRESHOLD.get());
- bind(ScheduledExecutorService.class).toInstance(executor);
-
- bind(HistoryPruner.class).in(Singleton.class);
- expose(HistoryPruner.class);
- }
- });
- PubsubEventModule.bindSubscriber(binder(), HistoryPruner.class);
- }
-
- /**
- * This method exists because we want to test the wiring up of TaskSchedulerImpl class to the
- * PubSub system in the TaskSchedulerImplTest class. The method has a complex signature because
- * the binding of the TaskScheduler and friends occurs in a PrivateModule which does not interact
- * well with the MultiBinder that backs the PubSub system.
- */
- @VisibleForTesting
- static void bindTaskScheduler(
- Binder binder,
- final Key<Preemptor> preemptorKey,
- final Amount<Long, Time> reservationDuration) {
- binder.install(new PrivateModule() {
- @Override protected void configure() {
- bind(Preemptor.class).to(preemptorKey);
- bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(ReservationDuration.class)
- .toInstance(reservationDuration);
- bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
- bind(TaskSchedulerImpl.class).in(Singleton.class);
- expose(TaskScheduler.class);
- }
- });
- PubsubEventModule.bindSubscriber(binder, TaskScheduler.class);
- }
-
- /**
- * Returns offers after a random duration within a fixed window.
- */
- private static class RandomJitterReturnDelay implements OfferReturnDelay {
- private static final int JITTER_WINDOW_MS = Amount.of(1, Time.MINUTES).as(Time.MILLISECONDS);
-
- private final int minHoldTimeMs = MIN_OFFER_HOLD_TIME.get().as(Time.MILLISECONDS);
- private final Random random = new Random.SystemRandom(new java.util.Random());
-
- @Override public Amount<Integer, Time> get() {
- return Amount.of(minHoldTimeMs + random.nextInt(JITTER_WINDOW_MS), Time.MILLISECONDS);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/async/HistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/HistoryPruner.java b/src/main/java/com/twitter/aurora/scheduler/async/HistoryPruner.java
deleted file mode 100644
index 9af6d36..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/async/HistoryPruner.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.async;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.LinkedHashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.eventbus.Subscribe;
-import com.google.inject.BindingAnnotation;
-
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.state.StateManager;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.Clock;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.aurora.scheduler.base.Tasks.LATEST_ACTIVITY;
-import static com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import static com.twitter.aurora.scheduler.events.PubsubEvent.StorageStarted;
-import static com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import static com.twitter.aurora.scheduler.events.PubsubEvent.TasksDeleted;
-
-/**
- * Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks
- * transitioning into one of the inactive states.
- */
-public class HistoryPruner implements EventSubscriber {
- private static final Logger LOG = Logger.getLogger(HistoryPruner.class.getName());
-
- @VisibleForTesting
- static final Query.Builder INACTIVE_QUERY = Query.unscoped().terminal();
-
- private final Multimap<IJobKey, String> tasksByJob =
- Multimaps.synchronizedSetMultimap(LinkedHashMultimap.<IJobKey, String>create());
- @VisibleForTesting
- Multimap<IJobKey, String> getTasksByJob() {
- return tasksByJob;
- }
-
- private final ScheduledExecutorService executor;
- private final Storage storage;
- private final StateManager stateManager;
- private final Clock clock;
- private final long pruneThresholdMillis;
- private final int perJobHistoryGoal;
- private final Map<String, Future<?>> taskIdToFuture = Maps.newConcurrentMap();
-
- @BindingAnnotation
- @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
- public @interface PruneThreshold { }
-
- @Inject
- HistoryPruner(
- final ScheduledExecutorService executor,
- final Storage storage,
- final StateManager stateManager,
- final Clock clock,
- @PruneThreshold Amount<Long, Time> inactivePruneThreshold,
- @PruneThreshold int perJobHistoryGoal) {
-
- this.executor = checkNotNull(executor);
- this.storage = checkNotNull(storage);
- this.stateManager = checkNotNull(stateManager);
- this.clock = checkNotNull(clock);
- this.pruneThresholdMillis = inactivePruneThreshold.as(Time.MILLISECONDS);
- this.perJobHistoryGoal = perJobHistoryGoal;
- }
-
- @VisibleForTesting
- long calculateTimeout(long taskEventTimestampMillis) {
- return pruneThresholdMillis - Math.max(0, clock.nowMillis() - taskEventTimestampMillis);
- }
-
- /**
- * When triggered, records an inactive task state change.
- *
- * @param change Event when a task changes state.
- */
- @Subscribe
- public void recordStateChange(TaskStateChange change) {
- if (Tasks.isTerminated(change.getNewState())) {
- registerInactiveTask(
- Tasks.SCHEDULED_TO_JOB_KEY.apply(change.getTask()),
- change.getTaskId(),
- calculateTimeout(clock.nowMillis()));
- }
- }
-
- /**
- * When triggered, iterates through inactive tasks in the system and prunes tasks that
- * exceed the history goal for a job or are beyond the time threshold.
- *
- * @param event A new StorageStarted event.
- */
- @Subscribe
- public void storageStarted(StorageStarted event) {
- for (IScheduledTask task
- : LATEST_ACTIVITY.sortedCopy(Storage.Util.consistentFetchTasks(storage, INACTIVE_QUERY))) {
-
- registerInactiveTask(
- Tasks.SCHEDULED_TO_JOB_KEY.apply(task),
- Tasks.id(task),
- calculateTimeout(Iterables.getLast(task.getTaskEvents()).getTimestamp()));
- }
- }
-
- private void deleteTasks(Set<String> taskIds) {
- LOG.info("Pruning inactive tasks " + taskIds);
- stateManager.deleteTasks(taskIds);
- }
-
- /**
- * When triggered, removes the tasks scheduled for pruning and cancels any existing future.
- *
- * @param event A new TasksDeleted event.
- */
- @Subscribe
- public void tasksDeleted(final TasksDeleted event) {
- for (IScheduledTask task : event.getTasks()) {
- String id = Tasks.id(task);
- tasksByJob.remove(Tasks.SCHEDULED_TO_JOB_KEY.apply(task), id);
- Future<?> future = taskIdToFuture.remove(id);
- if (future != null) {
- future.cancel(false);
- }
- }
- }
-
- private void registerInactiveTask(
- final IJobKey jobKey,
- final String taskId,
- long timeRemaining) {
-
- LOG.fine("Prune task " + taskId + " in " + timeRemaining + " ms.");
- // Insert the latest inactive task at the tail.
- tasksByJob.put(jobKey, taskId);
- Runnable runnable = new Runnable() {
- @Override public void run() {
- LOG.info("Pruning expired inactive task " + taskId);
- tasksByJob.remove(jobKey, taskId);
- taskIdToFuture.remove(taskId);
- deleteTasks(ImmutableSet.of(taskId));
- }
- };
- taskIdToFuture.put(taskId, executor.schedule(runnable, timeRemaining, TimeUnit.MILLISECONDS));
-
- ImmutableSet.Builder<String> pruneTaskIds = ImmutableSet.builder();
- Collection<String> tasks = tasksByJob.get(jobKey);
- // From Multimaps javadoc: "It is imperative that the user manually synchronize on the returned
- // multimap when accessing any of its collection views".
- synchronized (tasksByJob) {
- Iterator<String> iterator = tasks.iterator();
- while (tasks.size() > perJobHistoryGoal) {
- // Pick oldest task from the head. Guaranteed by LinkedHashMultimap based on insertion
- // order.
- String id = iterator.next();
- iterator.remove();
- pruneTaskIds.add(id);
- Future<?> future = taskIdToFuture.remove(id);
- if (future != null) {
- future.cancel(false);
- }
- }
- }
-
- Set<String> ids = pruneTaskIds.build();
- if (!ids.isEmpty()) {
- deleteTasks(ids);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/async/OfferQueue.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/OfferQueue.java b/src/main/java/com/twitter/aurora/scheduler/async/OfferQueue.java
deleted file mode 100644
index 95334ff..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/async/OfferQueue.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.async;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Function;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-import com.google.common.eventbus.Subscribe;
-
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.TaskInfo;
-
-import com.twitter.aurora.gen.HostStatus;
-import com.twitter.aurora.gen.MaintenanceMode;
-import com.twitter.aurora.scheduler.Driver;
-import com.twitter.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
-import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import com.twitter.aurora.scheduler.events.PubsubEvent.HostMaintenanceStateChange;
-import com.twitter.aurora.scheduler.state.MaintenanceController;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.Stats;
-
-import static com.twitter.aurora.gen.MaintenanceMode.DRAINED;
-import static com.twitter.aurora.gen.MaintenanceMode.DRAINING;
-import static com.twitter.aurora.gen.MaintenanceMode.NONE;
-import static com.twitter.aurora.gen.MaintenanceMode.SCHEDULED;
-
-/**
- * Tracks the Offers currently known by the scheduler
- */
-public interface OfferQueue extends EventSubscriber {
-
- /**
- * Notifies the scheduler of a new resource offer.
- *
- * @param offer Newly-available resource offer.
- */
- void addOffer(Offer offer);
-
- /**
- * Invalidates an offer. This indicates that the scheduler should not attempt to match any
- * tasks against the offer.
- *
- * @param offer Canceled offer.
- */
- void cancelOffer(OfferID offer);
-
- /**
- * Launches the first task that satisfies the {@code acceptor} by returning a {@link TaskInfo}.
- *
- * @param acceptor Function that determines if an offer is accepted.
- * @return {@code true} if the task was launched, {@code false} if no offers satisfied the
- * {@code acceptor}.
- * @throws LaunchException If the acceptor accepted an offer, but there was an error launching the
- * task.
- */
- boolean launchFirst(Function<Offer, Optional<TaskInfo>> acceptor) throws LaunchException;
-
- /**
- * Notifies the offer queue that a host has changed state.
- *
- * @param change State change notification.
- */
- void hostChangedState(HostMaintenanceStateChange change);
-
- /**
- * Gets the offers that the scheduler is holding.
- *
- * @return A snapshot of the offers that the scheduler is currently holding.
- */
- Iterable<Offer> getOffers();
-
- /**
- * Calculates the amount of time before an offer should be 'returned' by declining it.
- * The delay is calculated for each offer that is received, so the return delay may be
- * fixed or variable.
- */
- public interface OfferReturnDelay extends Supplier<Amount<Integer, Time>> {
- }
-
- /**
- * Thrown when there was an unexpected failure trying to launch a task.
- */
- static class LaunchException extends Exception {
- LaunchException(String msg) {
- super(msg);
- }
-
- LaunchException(String msg, Throwable cause) {
- super(msg, cause);
- }
- }
-
- class OfferQueueImpl implements OfferQueue {
- private static final Logger LOG = Logger.getLogger(OfferQueueImpl.class.getName());
-
- static final Comparator<HostOffer> PREFERENCE_COMPARATOR =
- // Currently, the only preference is based on host maintenance status.
- Ordering.explicit(NONE, SCHEDULED, DRAINING, DRAINED)
- .onResultOf(new Function<HostOffer, MaintenanceMode>() {
- @Override public MaintenanceMode apply(HostOffer offer) {
- return offer.mode;
- }
- })
- .compound(Ordering.arbitrary());
-
- private final Set<HostOffer> hostOffers = new ConcurrentSkipListSet<>(PREFERENCE_COMPARATOR);
- private final AtomicLong offerRaces = Stats.exportLong("offer_accept_races");
-
- private final Driver driver;
- private final OfferReturnDelay returnDelay;
- private final ScheduledExecutorService executor;
- private final MaintenanceController maintenance;
-
- @Inject
- OfferQueueImpl(Driver driver,
- OfferReturnDelay returnDelay,
- ScheduledExecutorService executor,
- MaintenanceController maintenance) {
-
- this.driver = driver;
- this.returnDelay = returnDelay;
- this.executor = executor;
- this.maintenance = maintenance;
- // Potential gotcha - since this is now a ConcurrentSkipListSet, size() is more expensive.
- // Could track this separately if it turns out to pose problems.
- Stats.exportSize("outstanding_offers", hostOffers);
- }
-
- @Override
- public void addOffer(final Offer offer) {
- // We run a slight risk of a race here, which is acceptable. The worst case is that we
- // temporarily hold two offers for the same host, which should be corrected when we return
- // them after the return delay.
- // There's also a chance that we return an offer for compaction ~simultaneously with the
- // same-host offer being canceled/returned. This is also fine.
- List<HostOffer> sameSlave = FluentIterable.from(hostOffers)
- .filter(new Predicate<HostOffer>() {
- @Override public boolean apply(HostOffer hostOffer) {
- return hostOffer.offer.getSlaveId().equals(offer.getSlaveId());
- }
- })
- .toList();
- if (sameSlave.isEmpty()) {
- hostOffers.add(new HostOffer(offer, maintenance.getMode(offer.getHostname())));
- executor.schedule(
- new Runnable() {
- @Override public void run() {
- removeAndDecline(offer.getId());
- }
- },
- returnDelay.get().as(Time.MILLISECONDS),
- TimeUnit.MILLISECONDS);
- } else {
- // If there are existing offers for the slave, decline all of them so the master can
- // compact all of those offers into a single offer and send them back.
- LOG.info("Returning " + (sameSlave.size() + 1)
- + " offers for " + offer.getSlaveId().getValue() + " for compaction.");
- decline(offer.getId());
- for (HostOffer sameSlaveOffer : sameSlave) {
- removeAndDecline(sameSlaveOffer.offer.getId());
- }
- }
- }
-
- void removeAndDecline(OfferID id) {
- if (removeFromHostOffers(id)) {
- decline(id);
- }
- }
-
- void decline(OfferID id) {
- LOG.fine("Declining offer " + id);
- driver.declineOffer(id);
- }
-
- @Override
- public void cancelOffer(final OfferID offerId) {
- removeFromHostOffers(offerId);
- }
-
- private boolean removeFromHostOffers(final OfferID offerId) {
- Preconditions.checkNotNull(offerId);
-
- // The small risk of inconsistency is acceptable here - if we have an accept/remove race
- // on an offer, the master will mark the task as LOST and it will be retried.
- return Iterables.removeIf(hostOffers,
- new Predicate<HostOffer>() {
- @Override public boolean apply(HostOffer input) {
- return input.offer.getId().equals(offerId);
- }
- });
- }
-
- @Override
- public Iterable<Offer> getOffers() {
- return Iterables.unmodifiableIterable(
- FluentIterable.from(hostOffers)
- .transform(new Function<HostOffer, Offer>() {
- @Override public Offer apply(HostOffer offer) {
- return offer.offer;
- }
- }));
- }
-
- /**
- * Updates the preference of a host's offers.
- *
- * @param change Host change notification.
- */
- @Subscribe
- public void hostChangedState(HostMaintenanceStateChange change) {
- final HostStatus hostStatus = change.getStatus();
-
- // Remove and re-add a host's offers to re-sort based on its new hostStatus
- Set<HostOffer> changedOffers = FluentIterable.from(hostOffers)
- .filter(new Predicate<HostOffer>() {
- @Override public boolean apply(HostOffer hostOffer) {
- return hostOffer.offer.getHostname().equals(hostStatus.getHost());
- }
- })
- .toSet();
- hostOffers.removeAll(changedOffers);
- hostOffers.addAll(
- FluentIterable.from(changedOffers)
- .transform(new Function<HostOffer, HostOffer>() {
- @Override public HostOffer apply(HostOffer hostOffer) {
- return new HostOffer(hostOffer.offer, hostStatus.getMode());
- }
- })
- .toSet());
- }
-
- /**
- * Notifies the queue that the driver is disconnected, and all the stored offers are now
- * invalid.
- * <p>
- * The queue takes this as a signal to flush its queue.
- *
- * @param event Disconnected event.
- */
- @Subscribe
- public void driverDisconnected(DriverDisconnected event) {
- LOG.info("Clearing stale offers since the driver is disconnected.");
- hostOffers.clear();
- }
-
- /**
- * Encapsulate an offer from a host, and the host's maintenance mode.
- */
- private static class HostOffer {
- private final Offer offer;
- private final MaintenanceMode mode;
-
- HostOffer(Offer offer, MaintenanceMode mode) {
- this.offer = offer;
- this.mode = mode;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof HostOffer)) {
- return false;
- }
- HostOffer other = (HostOffer) o;
- return Objects.equal(offer, other.offer) && (mode == other.mode);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(offer, mode);
- }
- }
-
- @Override
- public boolean launchFirst(Function<Offer, Optional<TaskInfo>> acceptor)
- throws LaunchException {
-
- // It's important that this method is not called concurrently - doing so would open up the
- // possibility of a race between the same offers being accepted by different threads.
-
- for (HostOffer hostOffer : hostOffers) {
- Optional<TaskInfo> assignment = acceptor.apply(hostOffer.offer);
- if (assignment.isPresent()) {
- // Guard against an offer being removed after we grabbed it from the iterator.
- // If that happens, the offer will not exist in hostOffers, and we can immediately
- // send it back to LOST for quick reschedule.
- if (hostOffers.remove(hostOffer)) {
- try {
- driver.launchTask(hostOffer.offer.getId(), assignment.get());
- return true;
- } catch (IllegalStateException e) {
- // TODO(William Farner): Catch only the checked exception produced by Driver
- // once it changes from throwing IllegalStateException when the driver is not yet
- // registered.
- throw new LaunchException("Failed to launch task.", e);
- }
- } else {
- offerRaces.incrementAndGet();
- throw new LaunchException(
- "Accepted offer no longer exists in offer queue, likely data race.");
- }
- }
- }
-
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/Preemptor.java b/src/main/java/com/twitter/aurora/scheduler/async/Preemptor.java
deleted file mode 100644
index a01790c..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/async/Preemptor.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.async;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Ordering;
-import com.google.common.collect.Sets;
-import com.google.inject.BindingAnnotation;
-
-import com.twitter.aurora.scheduler.ResourceSlot;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.ScheduleException;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.filter.SchedulingFilter;
-import com.twitter.aurora.scheduler.state.SchedulerCore;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.Clock;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import org.apache.mesos.Protos.Offer;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
-import static com.twitter.aurora.gen.ScheduleStatus.PREEMPTING;
-import static com.twitter.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
-
-/**
- * Preempts active tasks in favor of higher priority tasks.
- */
-public interface Preemptor {
-
- /**
- * Preempts active tasks in favor of the input task.
- *
- * @param taskId ID of the preempting task.
- * @return ID of the slave where preemption occured.
- */
- Optional<String> findPreemptionSlotFor(String taskId);
-
- /**
- * A task preemptor that tries to find tasks that are waiting to be scheduled, which are of higher
- * priority than tasks that are currently running.
- *
- * To avoid excessive churn, the preemptor requires that a task is PENDING for a duration
- * (dictated by {@link #preemptionCandidacyDelay}) before it becomes eligible to preempt other
- * tasks.
- */
- class PreemptorImpl implements Preemptor {
-
- /**
- * Binding annotation for the time interval after which a pending task becomes eligible to
- * preempt other tasks.
- */
- @BindingAnnotation
- @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
- @interface PreemptionDelay { }
-
- @VisibleForTesting
- static final Query.Builder CANDIDATE_QUERY = Query.statusScoped(
- EnumSet.copyOf(Sets.difference(Tasks.ACTIVE_STATES, EnumSet.of(PENDING, PREEMPTING))));
-
- private static final Logger LOG = Logger.getLogger(PreemptorImpl.class.getName());
-
- private static final Function<IAssignedTask, Integer> GET_PRIORITY =
- new Function<IAssignedTask, Integer>() {
- @Override public Integer apply(IAssignedTask task) {
- return task.getTask().getPriority();
- }
- };
-
- private final AtomicLong tasksPreempted = Stats.exportLong("preemptor_tasks_preempted");
- private final AtomicLong failedPreemptions = Stats.exportLong("preemptor_failed_preemptions");
- // Incremented every time the preemptor is invoked and finds tasks pending and preemptable tasks
- private final AtomicLong attemptedPreemptions = Stats.exportLong("preemptor_attempts");
- // Incremented every time we fail to find tasks to preempt for a pending task.
- private final AtomicLong noSlotsFound = Stats.exportLong("preemptor_no_slots_found");
-
- private final Predicate<IScheduledTask> isIdleTask = new Predicate<IScheduledTask>() {
- @Override public boolean apply(IScheduledTask task) {
- return (clock.nowMillis() - Iterables.getLast(task.getTaskEvents()).getTimestamp())
- >= preemptionCandidacyDelay.as(Time.MILLISECONDS);
- }
- };
-
- private final Storage storage;
- private final SchedulerCore scheduler;
- private final OfferQueue offerQueue;
- private final SchedulingFilter schedulingFilter;
- private final Amount<Long, Time> preemptionCandidacyDelay;
- private final Clock clock;
-
- /**
- * Creates a new preemptor.
- *
- * @param storage Backing store for tasks.
- * @param scheduler Scheduler to fetch task information from, and instruct when preempting
- * tasks.
- * @param offerQueue Queue that contains available Mesos resource offers.
- * @param schedulingFilter Filter to identify whether tasks may reside on given slaves.
- * @param preemptionCandidacyDelay Time a task must be PENDING before it may preempt other
- * tasks.
- * @param clock Clock to check current time.
- */
- @Inject
- PreemptorImpl(
- Storage storage,
- SchedulerCore scheduler,
- OfferQueue offerQueue,
- SchedulingFilter schedulingFilter,
- @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
- Clock clock) {
-
- this.storage = checkNotNull(storage);
- this.scheduler = checkNotNull(scheduler);
- this.offerQueue = checkNotNull(offerQueue);
- this.schedulingFilter = checkNotNull(schedulingFilter);
- this.preemptionCandidacyDelay = checkNotNull(preemptionCandidacyDelay);
- this.clock = checkNotNull(clock);
- }
-
- private List<IAssignedTask> fetch(Query.Builder query, Predicate<IScheduledTask> filter) {
- return Lists.newArrayList(Iterables.transform(Iterables.filter(
- Storage.Util.consistentFetchTasks(storage, query), filter),
- SCHEDULED_TO_ASSIGNED));
- }
-
- private List<IAssignedTask> fetch(Query.Builder query) {
- return fetch(query, Predicates.<IScheduledTask>alwaysTrue());
- }
-
- private static final Function<IAssignedTask, String> TASK_TO_SLAVE_ID =
- new Function<IAssignedTask, String>() {
- @Override public String apply(IAssignedTask input) {
- return input.getSlaveId();
- }
- };
-
- private static final Function<IAssignedTask, String> TASK_TO_HOST =
- new Function<IAssignedTask, String>() {
- @Override public String apply(IAssignedTask input) {
- return input.getSlaveHost();
- }
- };
-
- private static Predicate<IAssignedTask> canPreempt(final IAssignedTask pending) {
- return new Predicate<IAssignedTask>() {
- @Override public boolean apply(IAssignedTask possibleVictim) {
- return preemptionFilter(possibleVictim).apply(pending);
- }
- };
- }
-
- private static final Function<IAssignedTask, ResourceSlot> TASK_TO_RESOURCES =
- new Function<IAssignedTask, ResourceSlot>() {
- @Override public ResourceSlot apply(IAssignedTask input) {
- return ResourceSlot.from(input.getTask());
- }
- };
-
- private static final Function<Offer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
- new Function<Offer, ResourceSlot>() {
- @Override public ResourceSlot apply(Offer offer) {
- return ResourceSlot.from(offer);
- }
- };
-
- private static final Function<Offer, String> OFFER_TO_HOST =
- new Function<Offer, String>() {
- @Override public String apply(Offer offer) {
- return offer.getHostname();
- }
- };
-
- // TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector
- // ordering
- private static final Ordering<IAssignedTask> RESOURCE_ORDER =
- ResourceSlot.ORDER.onResultOf(TASK_TO_RESOURCES).reverse();
-
- /**
- * Optional.absent indicates that this slave does not have enough resources to satisfy the task.
- * The empty set indicates the offers (slack) are enough.
- * A set with elements indicates those tasks and the offers are enough.
- */
- private Optional<Set<IAssignedTask>> getTasksToPreempt(
- Iterable<IAssignedTask> possibleVictims,
- Iterable<Offer> offers,
- IAssignedTask pendingTask) {
-
- // This enforces the precondition that all of the resources are from the same host. We need to
- // get the host for the schedulingFilter.
- Set<String> hosts = ImmutableSet.<String>builder()
- .addAll(Iterables.transform(possibleVictims, TASK_TO_HOST))
- .addAll(Iterables.transform(offers, OFFER_TO_HOST)).build();
-
- String host = Iterables.getOnlyElement(hosts);
-
- ResourceSlot slackResources =
- ResourceSlot.sum(Iterables.transform(offers, OFFER_TO_RESOURCE_SLOT));
-
- if (!Iterables.isEmpty(offers)) {
- Set<SchedulingFilter.Veto> vetos = schedulingFilter.filter(
- slackResources,
- host,
- pendingTask.getTask(),
- pendingTask.getTaskId());
-
- if (vetos.isEmpty()) {
- return Optional.<Set<IAssignedTask>>of(ImmutableSet.<IAssignedTask>of());
- }
- }
-
- FluentIterable<IAssignedTask> preemptableTasks =
- FluentIterable.from(possibleVictims).filter(canPreempt(pendingTask));
-
- if (preemptableTasks.isEmpty()) {
- return Optional.absent();
- }
-
- List<IAssignedTask> toPreemptTasks = Lists.newArrayList();
-
- Iterable<IAssignedTask> sortedVictims = RESOURCE_ORDER.immutableSortedCopy(preemptableTasks);
-
- for (IAssignedTask victim : sortedVictims) {
- toPreemptTasks.add(victim);
-
- ResourceSlot totalResource = ResourceSlot.sum(
- ResourceSlot.sum(Iterables.transform(toPreemptTasks, TASK_TO_RESOURCES)),
- slackResources);
-
- Set<SchedulingFilter.Veto> vetos = schedulingFilter.filter(
- totalResource,
- host,
- pendingTask.getTask(),
- pendingTask.getTaskId());
-
- if (vetos.isEmpty()) {
- return Optional.<Set<IAssignedTask>>of(ImmutableSet.copyOf(toPreemptTasks));
- }
- }
- return Optional.absent();
- }
-
- private static final Function<Offer, String> OFFER_TO_SLAVE_ID =
- new Function<Offer, String>() {
- @Override public String apply(Offer offer) {
- return offer.getSlaveId().getValue();
- }
- };
-
- private Multimap<String, IAssignedTask> getSlavesToActiveTasks() {
- // Only non-pending active tasks may be preempted.
- List<IAssignedTask> activeTasks = fetch(CANDIDATE_QUERY);
-
- // Walk through the preemption candidates in reverse scheduling order.
- Collections.sort(activeTasks, Tasks.SCHEDULING_ORDER.reverse());
-
- // Group the tasks by slave id so they can be paired with offers from the same slave.
- return Multimaps.index(activeTasks, TASK_TO_SLAVE_ID);
- }
-
- @Override
- public synchronized Optional<String> findPreemptionSlotFor(String taskId) {
- List<IAssignedTask> pendingTasks =
- fetch(Query.statusScoped(PENDING).byId(taskId), isIdleTask);
-
- // Task is no longer PENDING no need to preempt
- if (pendingTasks.isEmpty()) {
- return Optional.absent();
- }
-
- IAssignedTask pendingTask = Iterables.getOnlyElement(pendingTasks);
-
- Multimap<String, IAssignedTask> slavesToActiveTasks = getSlavesToActiveTasks();
-
- if (slavesToActiveTasks.isEmpty()) {
- return Optional.absent();
- }
-
- attemptedPreemptions.incrementAndGet();
-
- // Group the offers by slave id so they can be paired with active tasks from the same slave.
- Multimap<String, Offer> slavesToOffers =
- Multimaps.index(offerQueue.getOffers(), OFFER_TO_SLAVE_ID);
-
- Set<String> allSlaves = ImmutableSet.<String>builder()
- .addAll(slavesToOffers.keySet())
- .addAll(slavesToActiveTasks.keySet())
- .build();
-
- for (String slaveID : allSlaves) {
- Optional<Set<IAssignedTask>> toPreemptTasks = getTasksToPreempt(
- slavesToActiveTasks.get(slaveID),
- slavesToOffers.get(slaveID),
- pendingTask);
-
- if (toPreemptTasks.isPresent()) {
- try {
- for (IAssignedTask toPreempt : toPreemptTasks.get()) {
- scheduler.preemptTask(toPreempt, pendingTask);
- tasksPreempted.incrementAndGet();
- }
- return Optional.of(slaveID);
- } catch (ScheduleException e) {
- LOG.log(Level.SEVERE, "Preemption failed", e);
- failedPreemptions.incrementAndGet();
- }
- }
- }
-
- noSlotsFound.incrementAndGet();
- return Optional.absent();
- }
-
- private static final Predicate<IAssignedTask> IS_PRODUCTION =
- Predicates.compose(Tasks.IS_PRODUCTION, Tasks.ASSIGNED_TO_INFO);
-
- /**
- * Creates a static filter that will identify tasks that may preempt the provided task.
- * A task may preempt another task if the following conditions hold true:
- * - The resources reserved for {@code preemptableTask} are sufficient to satisfy the task.
- * - The tasks are owned by the same user and the priority of {@code preemptableTask} is lower
- * OR {@code preemptableTask} is non-production and the compared task is production.
- *
- * @param preemptableTask Task to possibly preempt.
- * @return A filter that will compare the priorities and resources required by other tasks
- * with {@code preemptableTask}.
- */
- private static Predicate<IAssignedTask> preemptionFilter(IAssignedTask preemptableTask) {
- Predicate<IAssignedTask> preemptableIsProduction = preemptableTask.getTask().isProduction()
- ? Predicates.<IAssignedTask>alwaysTrue()
- : Predicates.<IAssignedTask>alwaysFalse();
-
- Predicate<IAssignedTask> priorityFilter =
- greaterPriorityFilter(GET_PRIORITY.apply(preemptableTask));
- return Predicates.or(
- Predicates.and(Predicates.not(preemptableIsProduction), IS_PRODUCTION),
- Predicates.and(isOwnedBy(getRole(preemptableTask)), priorityFilter)
- );
- }
-
- private static Predicate<IAssignedTask> isOwnedBy(final String role) {
- return new Predicate<IAssignedTask>() {
- @Override public boolean apply(IAssignedTask task) {
- return getRole(task).equals(role);
- }
- };
- }
-
- private static String getRole(IAssignedTask task) {
- return task.getTask().getOwner().getRole();
- }
-
- private static Predicate<Integer> greaterThan(final int value) {
- return new Predicate<Integer>() {
- @Override public boolean apply(Integer input) {
- return input > value;
- }
- };
- }
-
- private static Predicate<IAssignedTask> greaterPriorityFilter(int priority) {
- return Predicates.compose(greaterThan(priority), GET_PRIORITY);
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java
deleted file mode 100644
index eefc03a..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.async;
-
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.aurora.scheduler.storage.entities.ITaskEvent;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.BackoffStrategy;
-import com.twitter.common.util.Clock;
-import com.twitter.common.util.Random;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.aurora.gen.ScheduleStatus.KILLING;
-import static com.twitter.aurora.gen.ScheduleStatus.RESTARTING;
-
-/**
- * Calculates scheduling delays for tasks.
- */
-interface RescheduleCalculator {
- /**
- * Gets a timestamp for the task to become eligible for (re)scheduling at scheduler startup.
- *
- * @param task Task to calculate timestamp for.
- * @return Timestamp in msec.
- */
- long getStartupReadyTimeMs(IScheduledTask task);
-
- /**
- * Gets a timestamp for the task to become eligible for (re)scheduling.
- *
- * @param task Task to calculate timestamp for.
- * @return Timestamp in msec.
- */
- long getReadyTimeMs(IScheduledTask task);
-
- class RescheduleCalculatorImpl implements RescheduleCalculator {
-
- private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
-
- private final Storage storage;
- private final RescheduleCalculatorSettings settings;
- private final Clock clock;
- private final Random random = new Random.SystemRandom(new java.util.Random());
-
- private static final Predicate<ScheduleStatus> IS_ACTIVE_STATUS =
- Predicates.in(Tasks.ACTIVE_STATES);
-
- private static final Function<ITaskEvent, ScheduleStatus> TO_STATUS =
- new Function<ITaskEvent, ScheduleStatus>() {
- @Override public ScheduleStatus apply(ITaskEvent input) {
- return input.getStatus();
- }
- };
-
- private static final Set<ScheduleStatus> INTERRUPTED_TASK_STATES =
- EnumSet.of(RESTARTING, KILLING);
-
- private final Predicate<IScheduledTask> flapped = new Predicate<IScheduledTask>() {
- @Override public boolean apply(IScheduledTask task) {
- if (!task.isSetTaskEvents()) {
- return false;
- }
-
- List<ITaskEvent> events = Lists.reverse(task.getTaskEvents());
-
- // Avoid penalizing tasks that were interrupted by outside action, such as a user
- // restarting them.
- if (Iterables.any(Iterables.transform(events, TO_STATUS),
- Predicates.in(INTERRUPTED_TASK_STATES))) {
- return false;
- }
-
- ITaskEvent terminalEvent = Iterables.get(events, 0);
- ScheduleStatus terminalState = terminalEvent.getStatus();
- Preconditions.checkState(Tasks.isTerminated(terminalState));
-
- ITaskEvent activeEvent =
- Iterables.find(events, Predicates.compose(IS_ACTIVE_STATUS, TO_STATUS));
-
- long thresholdMs = settings.flappingTaskThreashold.as(Time.MILLISECONDS);
-
- return (terminalEvent.getTimestamp() - activeEvent.getTimestamp()) < thresholdMs;
- }
- };
-
- static class RescheduleCalculatorSettings {
- private final BackoffStrategy flappingTaskBackoff;
- private final Amount<Long, Time> flappingTaskThreashold;
- private final Amount<Integer, Time> maxStartupRescheduleDelay;
-
- RescheduleCalculatorSettings(
- BackoffStrategy flappingTaskBackoff,
- Amount<Long, Time> flappingTaskThreashold,
- Amount<Integer, Time> maxStartupRescheduleDelay) {
-
- this.flappingTaskBackoff = checkNotNull(flappingTaskBackoff);
- this.flappingTaskThreashold = checkNotNull(flappingTaskThreashold);
- this.maxStartupRescheduleDelay = checkNotNull(maxStartupRescheduleDelay);
- }
- }
-
- @Inject
- RescheduleCalculatorImpl(
- Storage storage,
- RescheduleCalculatorSettings settings,
- Clock clock) {
-
- this.storage = checkNotNull(storage);
- this.settings = checkNotNull(settings);
- this.clock = checkNotNull(clock);
- }
-
- @Override
- public long getStartupReadyTimeMs(IScheduledTask task) {
- return random.nextInt(settings.maxStartupRescheduleDelay.as(Time.MILLISECONDS))
- + getTaskReadyTimestamp(task);
- }
-
- @Override
- public long getReadyTimeMs(IScheduledTask task) {
- return getTaskReadyTimestamp(task);
- }
-
- private Optional<IScheduledTask> getTaskAncestor(IScheduledTask task) {
- if (!task.isSetAncestorId()) {
- return Optional.absent();
- }
-
- ImmutableSet<IScheduledTask> res =
- Storage.Util.weaklyConsistentFetchTasks(storage, Query.taskScoped(task.getAncestorId()));
-
- return Optional.fromNullable(Iterables.getOnlyElement(res, null));
- }
-
- private long getTaskReadyTimestamp(IScheduledTask task) {
- Optional<IScheduledTask> curTask = getTaskAncestor(task);
- long penaltyMs = 0;
- while (curTask.isPresent() && flapped.apply(curTask.get())) {
- LOG.info(
- String.format("Ancestor of %s flapped: %s", Tasks.id(task), Tasks.id(curTask.get())));
- long newPenalty = settings.flappingTaskBackoff.calculateBackoffMs(penaltyMs);
- // If the backoff strategy is truncated then there is no need for us to continue.
- if (newPenalty == penaltyMs) {
- break;
- }
- penaltyMs = newPenalty;
- curTask = getTaskAncestor(curTask.get());
- }
-
- return penaltyMs + clock.nowMillis();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/async/TaskGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/TaskGroup.java b/src/main/java/com/twitter/aurora/scheduler/async/TaskGroup.java
deleted file mode 100644
index 018022b..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/async/TaskGroup.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.async;
-
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-
-import com.twitter.aurora.scheduler.async.TaskGroups.GroupKey;
-import com.twitter.common.base.Function;
-import com.twitter.common.util.BackoffStrategy;
-
-/**
- * A group of task IDs that are eligible for scheduling, but may be waiting for a backoff to expire.
- */
-class TaskGroup {
- private final GroupKey key;
- private final BackoffStrategy backoffStrategy;
-
- private static final Function<Task, Long> TO_TIMESTAMP = new Function<Task, Long>() {
- @Override public Long apply(Task item) {
- return item.readyTimestampMs;
- }
- };
-
- // Order the tasks by the time they are ready to be scheduled
- private static final Ordering<Task> TASK_ORDERING = Ordering.natural().onResultOf(TO_TIMESTAMP);
- // 11 is the magic number used by PriorityBlockingQueue as the initial size.
- private final Queue<Task> tasks = new PriorityBlockingQueue<>(11, TASK_ORDERING);
- // Penalty for the task group for failing to schedule.
- private final AtomicLong penaltyMs;
-
- TaskGroup(GroupKey key, BackoffStrategy backoffStrategy) {
- this.key = key;
- this.backoffStrategy = backoffStrategy;
- penaltyMs = new AtomicLong();
- resetPenaltyAndGet();
- }
-
- GroupKey getKey() {
- return key;
- }
-
- private static final Function<Task, String> TO_TASK_ID =
- new Function<Task, String>() {
- @Override public String apply(Task item) {
- return item.taskId;
- }
- };
-
- /**
- * Removes the task at the head of the queue.
- *
- * @return String the id of the head task.
- * @throws IllegalStateException if the queue is empty.
- */
- String pop() throws IllegalStateException {
- Task head = tasks.poll();
- Preconditions.checkState(head != null);
- return head.taskId;
- }
-
- void remove(String taskId) {
- Iterables.removeIf(tasks, Predicates.compose(Predicates.equalTo(taskId), TO_TASK_ID));
- }
-
- void push(final String taskId, long readyTimestamp) {
- tasks.offer(new Task(taskId, readyTimestamp));
- }
-
- synchronized long resetPenaltyAndGet() {
- penaltyMs.set(backoffStrategy.calculateBackoffMs(0));
- return getPenaltyMs();
- }
-
- synchronized long penalizeAndGet() {
- penaltyMs.set(backoffStrategy.calculateBackoffMs(getPenaltyMs()));
- return getPenaltyMs();
- }
-
- GroupState isReady(long nowMs) {
- Task task = tasks.peek();
- if (task == null) {
- return GroupState.EMPTY;
- }
-
- if (task.readyTimestampMs > nowMs) {
- return GroupState.NOT_READY;
- }
- return GroupState.READY;
- }
- // Begin methods used for debug interfaces.
-
- public String getName() {
- return key.toString();
- }
-
- // TODO(zmanji): Return Task instances here. Can use them to display flapping penalty on web UI.
- public Set<String> getTaskIds() {
- return ImmutableSet.copyOf(Iterables.transform(tasks, TO_TASK_ID));
- }
-
- public long getPenaltyMs() {
- return penaltyMs.get();
- }
-
- private static class Task {
- private final String taskId;
- private final long readyTimestampMs;
-
- Task(String taskId, long readyTimestampMs) {
- this.taskId = Preconditions.checkNotNull(taskId);
- this.readyTimestampMs = readyTimestampMs;
- }
- }
-
- enum GroupState {
- EMPTY, // The group is empty.
- NOT_READY, // Every task in the group is not ready yet.
- READY // The task at the head of the queue is ready.
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java b/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java
deleted file mode 100644
index a59e5c8..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.async;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Objects;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.RateLimiter;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import com.twitter.aurora.scheduler.base.JobKeys;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import com.twitter.aurora.scheduler.events.PubsubEvent.StorageStarted;
-import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import com.twitter.aurora.scheduler.events.PubsubEvent.TasksDeleted;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.base.Command;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.BackoffStrategy;
-import com.twitter.common.util.Clock;
-import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
-import static com.twitter.aurora.scheduler.async.TaskGroup.GroupState;
-
-/**
- * A collection of task groups, where a task group is a collection of tasks that are known to be
- * equal in the way they schedule. This is expected to be tasks associated with the same job key,
- * who also have {@code equal()} {@link ITaskConfig} values.
- * <p>
- * This is used to prevent redundant work in trying to schedule tasks as well as to provide
- * nearly-equal responsiveness when scheduling across jobs. In other words, a 1000 instance job
- * cannot starve a 1 instance job.
- */
-public class TaskGroups implements EventSubscriber {
-
- private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
-
- private final Storage storage;
- private final LoadingCache<GroupKey, TaskGroup> groups;
- private final Clock clock;
- private final RescheduleCalculator rescheduleCalculator;
-
- static class TaskGroupsSettings {
- private final BackoffStrategy taskGroupBackoff;
- private final RateLimiter rateLimiter;
-
- TaskGroupsSettings(BackoffStrategy taskGroupBackoff, RateLimiter rateLimiter) {
- this.taskGroupBackoff = checkNotNull(taskGroupBackoff);
- this.rateLimiter = checkNotNull(rateLimiter);
- }
- }
-
- @Inject
- TaskGroups(
- ShutdownRegistry shutdownRegistry,
- Storage storage,
- TaskGroupsSettings settings,
- TaskScheduler taskScheduler,
- Clock clock,
- RescheduleCalculator rescheduleCalculator) {
-
- this(
- createThreadPool(shutdownRegistry),
- storage,
- settings.taskGroupBackoff,
- settings.rateLimiter,
- taskScheduler,
- clock,
- rescheduleCalculator);
- }
-
- TaskGroups(
- final ScheduledExecutorService executor,
- final Storage storage,
- final BackoffStrategy taskGroupBackoffStrategy,
- final RateLimiter rateLimiter,
- final TaskScheduler taskScheduler,
- final Clock clock,
- final RescheduleCalculator rescheduleCalculator) {
-
- this.storage = checkNotNull(storage);
- checkNotNull(executor);
- checkNotNull(taskGroupBackoffStrategy);
- checkNotNull(rateLimiter);
- checkNotNull(taskScheduler);
- this.clock = checkNotNull(clock);
- this.rescheduleCalculator = checkNotNull(rescheduleCalculator);
-
- final TaskScheduler ratelLimitedScheduler = new TaskScheduler() {
- @Override public TaskSchedulerResult schedule(String taskId) {
- rateLimiter.acquire();
- return taskScheduler.schedule(taskId);
- }
- };
-
- groups = CacheBuilder.newBuilder().build(new CacheLoader<GroupKey, TaskGroup>() {
- @Override public TaskGroup load(GroupKey key) {
- TaskGroup group = new TaskGroup(key, taskGroupBackoffStrategy);
- LOG.info("Evaluating group " + key + " in " + group.getPenaltyMs() + " ms");
- startGroup(group, executor, ratelLimitedScheduler);
- return group;
- }
- });
- }
-
- private synchronized boolean maybeInvalidate(TaskGroup group) {
- if (group.getTaskIds().isEmpty()) {
- groups.invalidate(group.getKey());
- return true;
- }
- return false;
- }
-
- private void startGroup(
- final TaskGroup group,
- final ScheduledExecutorService executor,
- final TaskScheduler taskScheduler) {
-
- Runnable monitor = new Runnable() {
- @Override public void run() {
- GroupState state = group.isReady(clock.nowMillis());
-
- switch (state) {
- case EMPTY:
- maybeInvalidate(group);
- break;
-
- case READY:
- String id = group.pop();
- TaskScheduler.TaskSchedulerResult result = taskScheduler.schedule(id);
- switch (result) {
- case SUCCESS:
- if (!maybeInvalidate(group)) {
- executor.schedule(this, group.resetPenaltyAndGet(), TimeUnit.MILLISECONDS);
- }
- break;
-
- case TRY_AGAIN:
- group.push(id, clock.nowMillis());
- executor.schedule(this, group.penalizeAndGet(), TimeUnit.MILLISECONDS);
- break;
-
- default:
- throw new IllegalStateException("Unknown TaskSchedulerResult " + result);
- }
- break;
-
- case NOT_READY:
- executor.schedule(this, group.getPenaltyMs(), TimeUnit.MILLISECONDS);
- break;
-
- default:
- throw new IllegalStateException("Unknown GroupState " + state);
- }
- }
- };
- executor.schedule(monitor, group.getPenaltyMs(), TimeUnit.MILLISECONDS);
- }
-
- private static ScheduledExecutorService createThreadPool(ShutdownRegistry shutdownRegistry) {
- // TODO(William Farner): Leverage ExceptionHandlingScheduledExecutorService:
- // com.twitter.common.util.concurrent.ExceptionHandlingScheduledExecutorService
- final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
- 1,
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TaskScheduler-%d").build());
- Stats.exportSize("schedule_queue_size", executor.getQueue());
- shutdownRegistry.addAction(new Command() {
- @Override public void execute() {
- new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute();
- }
- });
- return executor;
- }
-
- private synchronized void add(IAssignedTask task, long readyTimestamp) {
- groups.getUnchecked(new GroupKey(task.getTask())).push(task.getTaskId(), readyTimestamp);
- }
-
- /**
- * Informs the task groups of a task state change.
- * <p>
- * This is used to observe {@link com.twitter.aurora.gen.ScheduleStatus#PENDING} tasks and begin
- * attempting to schedule them.
- *
- * @param stateChange State change notification.
- */
- @Subscribe
- public synchronized void taskChangedState(TaskStateChange stateChange) {
- if (stateChange.getNewState() == PENDING) {
- add(
- stateChange.getTask().getAssignedTask(),
- rescheduleCalculator.getReadyTimeMs(stateChange.getTask()));
- }
- }
-
- /**
- * Signals that storage has started and is consistent.
- * <p>
- * Upon this signal, all {@link com.twitter.aurora.gen.ScheduleStatus#PENDING} tasks in the stoage
- * will become eligible for scheduling.
- *
- * @param event Storage started notification.
- */
- @Subscribe
- public void storageStarted(StorageStarted event) {
- for (IScheduledTask task
- : Storage.Util.consistentFetchTasks(storage, Query.unscoped().byStatus(PENDING))) {
-
- add(task.getAssignedTask(), rescheduleCalculator.getStartupReadyTimeMs(task));
- }
- }
-
- /**
- * Signals the scheduler that tasks have been deleted.
- *
- * @param deleted Tasks deleted event.
- */
- @Subscribe
- public synchronized void tasksDeleted(TasksDeleted deleted) {
- for (IAssignedTask task
- : Iterables.transform(deleted.getTasks(), Tasks.SCHEDULED_TO_ASSIGNED)) {
- TaskGroup group = groups.getIfPresent(new GroupKey(task.getTask()));
- if (group != null) {
- group.remove(task.getTaskId());
- }
- }
- }
-
- public Iterable<TaskGroup> getGroups() {
- return ImmutableSet.copyOf(groups.asMap().values());
- }
-
- static class GroupKey {
- private final ITaskConfig canonicalTask;
-
- GroupKey(ITaskConfig task) {
- this.canonicalTask = task;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(canonicalTask);
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof GroupKey)) {
- return false;
- }
- GroupKey other = (GroupKey) o;
- return Objects.equal(canonicalTask, other.canonicalTask);
- }
-
- @Override
- public String toString() {
- return JobKeys.toPath(Tasks.INFO_TO_JOB_KEY.apply(canonicalTask));
- }
- }
-}