You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ke...@apache.org on 2014/11/21 19:36:12 UTC
incubator-aurora git commit: Replace bindStartupAction with Service
registration
Repository: incubator-aurora
Updated Branches:
refs/heads/master ada97bdbb -> b6217df13
Replace bindStartupAction with Service registration
This attempts to kill LifecycleModule in favor of service registration.
Testing Done:
./gradlew -Pq build
Bugs closed: AURORA-920
Reviewed at https://reviews.apache.org/r/27757/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/b6217df1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/b6217df1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/b6217df1
Branch: refs/heads/master
Commit: b6217df13b5615e4c76835a3ed94e3e9a89a22a2
Parents: ada97bd
Author: Kevin Sweeney <ke...@apache.org>
Authored: Fri Nov 21 10:35:11 2014 -0800
Committer: Kevin Sweeney <ke...@apache.org>
Committed: Fri Nov 21 10:35:39 2014 -0800
----------------------------------------------------------------------
.../org/apache/aurora/scheduler/AppStartup.java | 33 +++++
.../aurora/scheduler/SchedulerModule.java | 30 +----
.../scheduler/SchedulerServicesModule.java | 123 +++++++++++++++++++
.../apache/aurora/scheduler/app/AppModule.java | 4 +-
.../aurora/scheduler/async/AsyncModule.java | 21 ++--
.../scheduler/async/JobUpdateHistoryPruner.java | 11 +-
.../scheduler/cron/quartz/CronModule.java | 4 +-
.../scheduler/events/PubsubEventModule.java | 14 ++-
.../scheduler/http/JettyServerModule.java | 15 ++-
.../apache/aurora/scheduler/sla/SlaModule.java | 4 +-
.../scheduler/stats/AsyncStatsModule.java | 106 +++++++++-------
.../scheduler/stats/TaskStatCalculator.java | 3 +-
.../aurora/scheduler/async/AsyncModuleTest.java | 11 +-
.../async/JobUpdateHistoryPrunerTest.java | 2 +-
.../aurora/scheduler/async/KillRetryTest.java | 6 +-
.../scheduler/http/JettyServerModuleTest.java | 2 +
.../aurora/scheduler/state/PubsubTestUtil.java | 14 +--
17 files changed, 294 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b6217df1/src/main/java/org/apache/aurora/scheduler/AppStartup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/AppStartup.java b/src/main/java/org/apache/aurora/scheduler/AppStartup.java
new file mode 100644
index 0000000..67759fd
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/AppStartup.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import javax.inject.Qualifier;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Qualifier for services that should be started as soon as possible, regardless of
+ * whether or not this scheduler is the leader.
+ */
+@Qualifier
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER})
+@VisibleForTesting
+public @interface AppStartup { }
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b6217df1/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
index 72d3d60..d3ac176 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
@@ -14,30 +14,21 @@
package org.apache.aurora.scheduler;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Logger;
import javax.inject.Singleton;
import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.Service;
-import com.google.common.util.concurrent.ServiceManager;
import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
import com.google.inject.PrivateModule;
import com.google.inject.Provides;
-import com.google.inject.binder.LinkedBindingBuilder;
-import com.google.inject.multibindings.Multibinder;
import com.twitter.common.args.Arg;
import com.twitter.common.args.CmdLine;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
-import org.apache.aurora.GuavaUtils;
-import org.apache.aurora.GuavaUtils.ServiceManagerIface;
import org.apache.aurora.scheduler.SchedulerLifecycle.LeadingOptions;
-import org.apache.aurora.scheduler.SchedulerLifecycle.SchedulerActive;
import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
import org.apache.aurora.scheduler.async.GcExecutorLauncher;
import org.apache.aurora.scheduler.base.AsyncUtil;
@@ -83,7 +74,7 @@ public class SchedulerModule extends AbstractModule {
PubsubEventModule.bindSubscriber(binder(), SchedulerLifecycle.class);
bind(TaskVars.class).in(Singleton.class);
PubsubEventModule.bindSubscriber(binder(), TaskVars.class);
- addSchedulerActiveServiceBinding(binder()).to(TaskVars.class);
+ SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(TaskVars.class);
}
@Provides
@@ -95,23 +86,4 @@ public class SchedulerModule extends AbstractModule {
return ImmutableList.of(gcLauncher, userTaskLauncher);
}
- /**
- * Register a Service to run after storage is ready, but before the scheduler has announced
- * leadership. If this service fails to startup the scheduler will abort.
- *
- * Usage: {@code addSchedulerActiveServiceBinding(binder()).to(YourService.class)}.
- *
- * @param binder Binder for the current non-private module.
- * @return a linked binding builder with the normal Guice EDSL methods.
- */
- public static LinkedBindingBuilder<Service> addSchedulerActiveServiceBinding(Binder binder) {
- return Multibinder.newSetBinder(binder, Service.class, SchedulerActive.class).addBinding();
- }
-
- @Provides
- @Singleton
- @SchedulerActive
- ServiceManagerIface provideSchedulerActiveServiceManager(@SchedulerActive Set<Service> services) {
- return GuavaUtils.serviceManager(new ServiceManager(services));
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b6217df1/src/main/java/org/apache/aurora/scheduler/SchedulerServicesModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerServicesModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerServicesModule.java
new file mode 100644
index 0000000..ad2284f
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerServicesModule.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import com.google.common.util.concurrent.Service;
+import com.google.common.util.concurrent.ServiceManager;
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.Provides;
+import com.google.inject.binder.LinkedBindingBuilder;
+import com.google.inject.multibindings.Multibinder;
+import com.twitter.common.application.ShutdownRegistry;
+import com.twitter.common.application.modules.LifecycleModule;
+import com.twitter.common.base.Command;
+import com.twitter.common.base.ExceptionalCommand;
+
+import org.apache.aurora.GuavaUtils;
+import org.apache.aurora.GuavaUtils.ServiceManagerIface;
+import org.apache.aurora.scheduler.SchedulerLifecycle.SchedulerActive;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Coordinates scheduler startup.
+ */
+public class SchedulerServicesModule extends AbstractModule {
+ /**
+ * Register a Service to run as close as possible to app startup.
+ *
+ * Usage: {@code addAppStartupServiceBinding(binder()).to(YourService.class)}.
+ *
+ * @param binder Binder for the current non-private module.
+ * @return a linked binding builder with the normal Guice EDSL methods.
+ */
+ public static LinkedBindingBuilder<Service> addAppStartupServiceBinding(Binder binder) {
+ return Multibinder.newSetBinder(binder, Service.class, AppStartup.class).addBinding();
+ }
+
+ /**
+ * Register a Service to run after storage is ready, but before the scheduler has announced
+ * leadership. If this service fails to startup the scheduler will abort.
+ *
+ * Usage: {@code addSchedulerActiveServiceBinding(binder()).to(YourService.class)}.
+ *
+ * @param binder Binder for the current non-private module.
+ * @return a linked binding builder with the normal Guice EDSL methods.
+ */
+ public static LinkedBindingBuilder<Service> addSchedulerActiveServiceBinding(Binder binder) {
+ return Multibinder.newSetBinder(binder, Service.class, SchedulerActive.class).addBinding();
+ }
+
+ @Override
+ protected void configure() {
+ LifecycleModule.bindStartupAction(binder(), ServiceManagerAdapterCommand.class);
+
+ // Add a binding.
+ Multibinder.newSetBinder(binder(), Service.class, AppStartup.class);
+ Multibinder.newSetBinder(binder(), Service.class, SchedulerActive.class);
+ }
+
+ /**
+ * Adapter to make twitter.common.application startup call into Guava's ServiceManager.
+ */
+ @Singleton
+ static class ServiceManagerAdapterCommand implements Command {
+ private final ShutdownRegistry shutdownRegistry;
+ private final ServiceManagerIface serviceManager;
+
+ @Inject
+ ServiceManagerAdapterCommand(
+ ShutdownRegistry shutdownRegistry,
+ @AppStartup final ServiceManagerIface serviceManager) {
+
+ this.shutdownRegistry = requireNonNull(shutdownRegistry);
+ this.serviceManager = requireNonNull(serviceManager);
+ }
+
+ @Override
+ public void execute() {
+ serviceManager.startAsync();
+ shutdownRegistry.addAction(new ExceptionalCommand<TimeoutException>() {
+ @Override
+ public void execute() throws TimeoutException {
+ serviceManager.stopAsync();
+ serviceManager.awaitStopped(5L, TimeUnit.SECONDS);
+ }
+ });
+ serviceManager.awaitHealthy();
+ }
+ }
+
+ @Provides
+ @Singleton
+ @AppStartup
+ ServiceManagerIface provideAppStartupServiceManager(@AppStartup Set<Service> services) {
+ return GuavaUtils.serviceManager(new ServiceManager(services));
+ }
+
+ @Provides
+ @Singleton
+ @SchedulerActive
+ ServiceManagerIface provideSchedulerActiveServiceManager(@SchedulerActive Set<Service> services) {
+ return GuavaUtils.serviceManager(new ServiceManager(services));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b6217df1/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
index 19bf162..6f1cf47 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -45,6 +45,7 @@ import com.twitter.thrift.ServiceInstance;
import org.apache.aurora.GuiceUtils;
import org.apache.aurora.gen.ServerInfo;
import org.apache.aurora.scheduler.SchedulerModule;
+import org.apache.aurora.scheduler.SchedulerServicesModule;
import org.apache.aurora.scheduler.async.AsyncModule;
import org.apache.aurora.scheduler.events.PubsubEventModule;
import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
@@ -69,7 +70,7 @@ import static org.apache.aurora.gen.apiConstants.THRIFT_API_VERSION;
/**
* Binding module for the aurora scheduler application.
*/
-class AppModule extends AbstractModule {
+public class AppModule extends AbstractModule {
private static final Logger LOG = Logger.getLogger(AppModule.class.getName());
private final String clusterName;
@@ -118,6 +119,7 @@ class AppModule extends AbstractModule {
install(new QuotaModule());
install(new JettyServerModule());
install(new SchedulerDriverModule());
+ install(new SchedulerServicesModule());
install(new SchedulerModule());
install(new StateModule());
install(new SlaModule());
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b6217df1/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index 4e37f4c..03cbe24 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -27,18 +27,17 @@ import javax.inject.Singleton;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.RateLimiter;
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.PrivateModule;
import com.google.inject.TypeLiteral;
-import com.twitter.common.application.modules.LifecycleModule;
import com.twitter.common.args.Arg;
import com.twitter.common.args.CmdLine;
import com.twitter.common.args.constraints.NotNegative;
import com.twitter.common.args.constraints.Positive;
-import com.twitter.common.base.Command;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.stats.StatsProvider;
@@ -46,7 +45,7 @@ import com.twitter.common.util.BackoffStrategy;
import com.twitter.common.util.Random;
import com.twitter.common.util.TruncatedBinaryBackoff;
-import org.apache.aurora.scheduler.SchedulerModule;
+import org.apache.aurora.scheduler.SchedulerServicesModule;
import org.apache.aurora.scheduler.async.GcExecutorLauncher.GcExecutorSettings;
import org.apache.aurora.scheduler.async.GcExecutorLauncher.RandomGcExecutorSettings;
import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
@@ -239,7 +238,7 @@ public class AsyncModule extends AbstractModule {
final ScheduledThreadPoolExecutor executor =
AsyncUtil.loggingScheduledExecutor(ASYNC_WORKER_THREADS.get(), "AsyncProcessor-%d", LOG);
bind(ScheduledThreadPoolExecutor.class).annotatedWith(AsyncExecutor.class).toInstance(executor);
- LifecycleModule.bindStartupAction(binder(), RegisterGauges.class);
+ SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RegisterGauges.class);
// AsyncModule itself is not a subclass of PrivateModule because TaskEventModule internally uses
// a MultiBinder, which cannot span multiple injectors.
@@ -255,7 +254,7 @@ public class AsyncModule extends AbstractModule {
}
});
PubsubEventModule.bindSubscriber(binder(), TaskTimeout.class);
- SchedulerModule.addSchedulerActiveServiceBinding(binder()).to(TaskTimeout.class);
+ SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(TaskTimeout.class);
install(new PrivateModule() {
@Override
@@ -366,7 +365,8 @@ public class AsyncModule extends AbstractModule {
expose(JobUpdateHistoryPruner.class);
}
});
- LifecycleModule.bindStartupAction(binder(), JobUpdateHistoryPruner.class);
+ SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
+ .to(JobUpdateHistoryPruner.class);
install(new PrivateModule() {
@Override
@@ -409,7 +409,7 @@ public class AsyncModule extends AbstractModule {
PubsubEventModule.bindSubscriber(binder, TaskScheduler.class);
}
- static class RegisterGauges implements Command {
+ static class RegisterGauges extends AbstractIdleService {
private final StatsProvider statsProvider;
private final ScheduledThreadPoolExecutor executor;
@@ -423,7 +423,7 @@ public class AsyncModule extends AbstractModule {
}
@Override
- public void execute() throws RuntimeException {
+ protected void startUp() {
statsProvider.makeGauge(
TIMEOUT_QUEUE_GAUGE,
new Supplier<Integer>() {
@@ -442,5 +442,10 @@ public class AsyncModule extends AbstractModule {
}
);
}
+
+ @Override
+ protected void shutDown() {
+ // Nothing to do - await VM shutdown.
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b6217df1/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
index 8e9a1dc..7a349bb 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
@@ -21,7 +21,7 @@ import java.util.logging.Logger;
import javax.inject.Inject;
import com.google.common.base.Joiner;
-import com.twitter.common.base.Command;
+import com.google.common.util.concurrent.AbstractIdleService;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.util.Clock;
@@ -35,7 +35,7 @@ import static java.util.Objects.requireNonNull;
/**
* Prunes per-job update history on a periodic basis.
*/
-class JobUpdateHistoryPruner implements Command {
+class JobUpdateHistoryPruner extends AbstractIdleService {
private static final Logger LOG = Logger.getLogger(JobUpdateHistoryPruner.class.getName());
private final Clock clock;
@@ -73,7 +73,7 @@ class JobUpdateHistoryPruner implements Command {
}
@Override
- public void execute() throws RuntimeException {
+ protected void startUp() {
executor.scheduleAtFixedRate(
new Runnable() {
@Override
@@ -96,4 +96,9 @@ class JobUpdateHistoryPruner implements Command {
settings.pruneInterval.as(Time.MILLISECONDS),
TimeUnit.MILLISECONDS);
}
+
+ @Override
+ protected void shutDown() {
+ // Nothing to do - await VM shutdown.
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b6217df1/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
index 22c666e..89e5258 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
@@ -28,7 +28,7 @@ import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.util.BackoffHelper;
-import org.apache.aurora.scheduler.SchedulerModule;
+import org.apache.aurora.scheduler.SchedulerServicesModule;
import org.apache.aurora.scheduler.cron.CronJobManager;
import org.apache.aurora.scheduler.cron.CronPredictor;
import org.apache.aurora.scheduler.cron.CronScheduler;
@@ -90,7 +90,7 @@ public class CronModule extends AbstractModule {
new BackoffHelper(CRON_START_INITIAL_BACKOFF.get(), CRON_START_MAX_BACKOFF.get())));
bind(CronLifecycle.class).in(Singleton.class);
- SchedulerModule.addSchedulerActiveServiceBinding(binder()).to(CronLifecycle.class);
+ SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(CronLifecycle.class);
}
@Provides
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b6217df1/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
index 9e8ba65..3a4d40a 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
@@ -32,6 +32,7 @@ import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.DeadEvent;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.AbstractModule;
@@ -46,6 +47,7 @@ import com.twitter.common.args.constraints.Positive;
import com.twitter.common.base.Command;
import com.twitter.common.stats.StatsProvider;
+import org.apache.aurora.scheduler.SchedulerServicesModule;
import org.apache.aurora.scheduler.events.NotifyingSchedulingFilter.NotifyDelegate;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
@@ -128,7 +130,8 @@ public final class PubsubEventModule extends AbstractModule {
// Ensure at least an empty binding is present.
getSubscriberBinder(binder());
- LifecycleModule.bindStartupAction(binder(), RegisterSubscribers.class);
+ // TODO(ksweeney): Would this be better as a scheduler active service?
+ SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RegisterSubscribers.class);
}
private class DeadEventHandler {
@@ -164,7 +167,7 @@ public final class PubsubEventModule extends AbstractModule {
}
}
- static class RegisterSubscribers implements Command {
+ static class RegisterSubscribers extends AbstractIdleService {
private final EventBus eventBus;
private final Set<EventSubscriber> subscribers;
@@ -175,11 +178,16 @@ public final class PubsubEventModule extends AbstractModule {
}
@Override
- public void execute() {
+ protected void startUp() {
for (EventSubscriber subscriber : subscribers) {
eventBus.register(subscriber);
}
}
+
+ @Override
+ protected void shutDown() {
+ // Nothing to do - await VM shutdown.
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b6217df1/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java b/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
index 36706f2..4b92815 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
@@ -27,6 +27,7 @@ import javax.ws.rs.HttpMethod;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.AbstractModule;
import com.google.inject.Injector;
import com.google.inject.TypeLiteral;
@@ -41,7 +42,6 @@ import com.twitter.common.application.modules.LifecycleModule.LaunchException;
import com.twitter.common.args.Arg;
import com.twitter.common.args.CmdLine;
import com.twitter.common.base.Command;
-import com.twitter.common.base.ExceptionalCommand;
import com.twitter.common.base.ExceptionalSupplier;
import com.twitter.common.base.MoreSuppliers;
import com.twitter.common.net.http.handlers.AbortHandler;
@@ -56,6 +56,7 @@ import com.twitter.common.net.http.handlers.VarsHandler;
import com.twitter.common.net.http.handlers.VarsJsonHandler;
import com.twitter.common.net.pool.DynamicHostSet.MonitorException;
+import org.apache.aurora.scheduler.SchedulerServicesModule;
import org.apache.aurora.scheduler.http.api.ApiBeta;
import org.eclipse.jetty.rewrite.handler.RewriteHandler;
import org.eclipse.jetty.rewrite.handler.RewriteRegexRule;
@@ -127,11 +128,10 @@ public class JettyServerModule extends AbstractModule {
LifecycleModule.bindServiceRunner(binder(), HttpServerLauncher.class);
bind(LeaderRedirect.class).in(Singleton.class);
- LifecycleModule.bindStartupAction(binder(), RedirectMonitor.class);
+ SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RedirectMonitor.class);
}
- static class RedirectMonitor implements ExceptionalCommand<MonitorException> {
-
+ static class RedirectMonitor extends AbstractIdleService {
private final LeaderRedirect redirector;
@Inject
@@ -140,9 +140,14 @@ public class JettyServerModule extends AbstractModule {
}
@Override
- public void execute() throws MonitorException {
+ public void startUp() throws MonitorException {
redirector.monitor();
}
+
+ @Override
+ protected void shutDown() {
+ // Nothing to do here - we await VM shutdown.
+ }
}
// TODO(wfarner): Use guava's Service to enforce the lifecycle of this.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b6217df1/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
index 354844a..64e986f 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
@@ -32,7 +32,7 @@ import com.twitter.common.args.constraints.Positive;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
-import org.apache.aurora.scheduler.SchedulerModule;
+import org.apache.aurora.scheduler.SchedulerServicesModule;
import org.apache.aurora.scheduler.base.AsyncUtil;
import org.apache.aurora.scheduler.sla.MetricCalculator.MetricCalculatorSettings;
@@ -81,7 +81,7 @@ public class SlaModule extends AbstractModule {
.toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("SlaStat-%d", LOG));
bind(SlaUpdater.class).in(Singleton.class);
- SchedulerModule.addSchedulerActiveServiceBinding(binder()).to(SlaUpdater.class);
+ SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(SlaUpdater.class);
}
// TODO(ksweeney): This should use AbstractScheduledService.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b6217df1/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
index 1c9904c..bde45db 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
@@ -13,50 +13,37 @@
*/
package org.apache.aurora.scheduler.stats;
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
import javax.inject.Inject;
-import javax.inject.Qualifier;
import javax.inject.Singleton;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
+import com.google.common.util.concurrent.AbstractScheduledService;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
import com.google.inject.AbstractModule;
-import com.twitter.common.application.modules.LifecycleModule;
+import com.google.inject.PrivateModule;
import com.twitter.common.args.Arg;
import com.twitter.common.args.CmdLine;
-import com.twitter.common.base.Command;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Data;
import com.twitter.common.quantity.Time;
import org.apache.aurora.gen.ResourceAggregate;
import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.SchedulerServicesModule;
import org.apache.aurora.scheduler.async.OfferQueue;
-import org.apache.aurora.scheduler.base.AsyncUtil;
import org.apache.aurora.scheduler.base.Conversions;
import org.apache.aurora.scheduler.configuration.Resources;
import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResource;
import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResourceProvider;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.Objects.requireNonNull;
/**
* Module to configure export of cluster-wide resource allocation and consumption statistics.
*/
public class AsyncStatsModule extends AbstractModule {
-
- private static final Logger LOG = Logger.getLogger(AsyncStatsModule.class.getName());
-
@CmdLine(name = "async_task_stat_update_interval",
help = "Interval on which to try to update resource consumption stats.")
private static final Arg<Amount<Long, Time>> TASK_STAT_INTERVAL =
@@ -67,46 +54,83 @@ public class AsyncStatsModule extends AbstractModule {
private static final Arg<Amount<Long, Time>> SLOT_STAT_INTERVAL =
Arg.create(Amount.of(1L, Time.MINUTES));
- @Qualifier
- @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
- private @interface StatExecutor { }
-
@Override
protected void configure() {
- final ScheduledExecutorService executor =
- AsyncUtil.singleThreadLoggingScheduledExecutor("AsyncStat-%d", LOG);
-
bind(TaskStatCalculator.class).in(Singleton.class);
bind(CachedCounters.class).in(Singleton.class);
bind(MachineResourceProvider.class).to(OfferAdapter.class);
bind(SlotSizeCounter.class).in(Singleton.class);
- bind(ScheduledExecutorService.class).annotatedWith(StatExecutor.class).toInstance(executor);
- LifecycleModule.bindStartupAction(binder(), StatUpdater.class);
+ install(new PrivateModule() {
+ @Override
+ protected void configure() {
+ bind(TaskStatUpdaterService.class).in(Singleton.class);
+ bind(Scheduler.class).toInstance(
+ Scheduler.newFixedRateSchedule(
+ TASK_STAT_INTERVAL.get().getValue(),
+ TASK_STAT_INTERVAL.get().getValue(),
+ TASK_STAT_INTERVAL.get().getUnit().getTimeUnit()));
+ expose(TaskStatUpdaterService.class);
+ }
+ });
+ SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
+ .to(TaskStatUpdaterService.class);
+
+ install(new PrivateModule() {
+ @Override
+ protected void configure() {
+ bind(SlotSizeCounterService.class).in(Singleton.class);
+ bind(Scheduler.class).toInstance(
+ Scheduler.newFixedRateSchedule(
+ SLOT_STAT_INTERVAL.get().getValue(),
+ SLOT_STAT_INTERVAL.get().getValue(),
+ SLOT_STAT_INTERVAL.get().getUnit().getTimeUnit()));
+ expose(SlotSizeCounterService.class);
+ }
+ });
+ SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
+ .to(SlotSizeCounterService.class);
}
- static class StatUpdater implements Command {
- private final ScheduledExecutorService executor;
+ static class TaskStatUpdaterService extends AbstractScheduledService {
private final TaskStatCalculator taskStats;
- private final SlotSizeCounter slotCounter;
+ private final Scheduler schedule;
@Inject
- StatUpdater(
- @StatExecutor ScheduledExecutorService executor,
- TaskStatCalculator taskStats,
- SlotSizeCounter slotCounter) {
-
- this.executor = requireNonNull(executor);
+ TaskStatUpdaterService(TaskStatCalculator taskStats, Scheduler schedule) {
this.taskStats = requireNonNull(taskStats);
- this.slotCounter = requireNonNull(slotCounter);
+ this.schedule = requireNonNull(schedule);
+ }
+
+ @Override
+ protected void runOneIteration() {
+ taskStats.run();
+ }
+
+ @Override
+ protected Scheduler scheduler() {
+ return schedule;
+ }
+ }
+
+ static class SlotSizeCounterService extends AbstractScheduledService {
+ private final SlotSizeCounter slotSizeCounter;
+ private final Scheduler schedule;
+
+ @Inject
+ SlotSizeCounterService(SlotSizeCounter slotSizeCounter, Scheduler schedule) {
+ this.slotSizeCounter = requireNonNull(slotSizeCounter);
+ this.schedule = requireNonNull(schedule);
+ }
+
+ @Override
+ protected void runOneIteration() {
+ slotSizeCounter.run();
}
@Override
- public void execute() {
- long taskInterval = TASK_STAT_INTERVAL.get().as(Time.SECONDS);
- executor.scheduleAtFixedRate(taskStats, taskInterval, taskInterval, TimeUnit.SECONDS);
- long slotInterval = SLOT_STAT_INTERVAL.get().as(Time.SECONDS);
- executor.scheduleAtFixedRate(slotCounter, slotInterval, slotInterval, TimeUnit.SECONDS);
+ protected Scheduler scheduler() {
+ return schedule;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b6217df1/src/main/java/org/apache/aurora/scheduler/stats/TaskStatCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/TaskStatCalculator.java b/src/main/java/org/apache/aurora/scheduler/stats/TaskStatCalculator.java
index a7f9401..c1e7197 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/TaskStatCalculator.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/TaskStatCalculator.java
@@ -17,7 +17,6 @@ import java.util.logging.Logger;
import javax.inject.Inject;
-import org.apache.aurora.scheduler.stats.AsyncStatsModule.StatUpdater;
import org.apache.aurora.scheduler.stats.ResourceCounter.GlobalMetric;
import org.apache.aurora.scheduler.stats.ResourceCounter.Metric;
import org.apache.aurora.scheduler.storage.Storage.StorageException;
@@ -28,7 +27,7 @@ import static java.util.Objects.requireNonNull;
* Calculates and exports aggregate stats about resources consumed by active tasks.
*/
class TaskStatCalculator implements Runnable {
- private static final Logger LOG = Logger.getLogger(StatUpdater.class.getName());
+ private static final Logger LOG = Logger.getLogger(TaskStatCalculator.class.getName());
private final CachedCounters counters;
private final ResourceCounter resourceCounter;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b6217df1/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java b/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
index 962aff8..e007c30 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
@@ -13,15 +13,19 @@
*/
package org.apache.aurora.scheduler.async;
+import java.util.Set;
+
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.Service;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
+import com.google.inject.TypeLiteral;
import com.twitter.common.application.StartupStage;
import com.twitter.common.application.modules.LifecycleModule;
import com.twitter.common.base.ExceptionalCommand;
@@ -29,6 +33,7 @@ import com.twitter.common.stats.StatsProvider;
import com.twitter.common.testing.easymock.EasyMockTest;
import com.twitter.common.util.Clock;
+import org.apache.aurora.scheduler.AppStartup;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.mesos.Driver;
@@ -92,7 +97,11 @@ public class AsyncModuleTest extends EasyMockTest {
control.replay();
- injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute();
+ Set<Service> services = injector.getInstance(
+ Key.get(new TypeLiteral<Set<Service>>() { }, AppStartup.class));
+ for (Service service : services) {
+ service.startAsync().awaitRunning();
+ }
injector.getBindings();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b6217df1/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
index 1376237..1cd693a 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
@@ -56,7 +56,7 @@ public class JobUpdateHistoryPrunerTest extends EasyMockTest {
Amount.of(1L, Time.MILLISECONDS),
1));
- pruner.execute();
+ pruner.startAsync().awaitRunning();
executorClock.advance(Amount.of(2L, Time.MILLISECONDS));
executorClock.advance(Amount.of(1L, Time.MILLISECONDS));
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b6217df1/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java b/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
index e4e252e..0faee92 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/KillRetryTest.java
@@ -24,10 +24,7 @@ import com.google.common.testing.TearDown;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
-import com.google.inject.Key;
-import com.twitter.common.application.StartupStage;
import com.twitter.common.application.modules.LifecycleModule;
-import com.twitter.common.base.ExceptionalCommand;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.stats.StatsProvider;
@@ -41,6 +38,7 @@ import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEventModule;
import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.state.PubsubTestUtil;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
@@ -98,7 +96,7 @@ public class KillRetryTest extends EasyMockTest {
}
);
eventBus = injector.getInstance(EventBus.class);
- injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute();
+ PubsubTestUtil.startPubsub(injector);
}
private static IScheduledTask makeTask(String id, ScheduleStatus status) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b6217df1/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java b/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java
index 4d58240..fbc3da3 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java
@@ -45,6 +45,7 @@ import com.twitter.thrift.ServiceInstance;
import org.apache.aurora.gen.AuroraAdmin;
import org.apache.aurora.gen.ServerInfo;
+import org.apache.aurora.scheduler.SchedulerServicesModule;
import org.apache.aurora.scheduler.async.OfferQueue;
import org.apache.aurora.scheduler.async.RescheduleCalculator;
import org.apache.aurora.scheduler.async.TaskGroups.TaskGroupsSettings;
@@ -88,6 +89,7 @@ public abstract class JettyServerModuleTest extends EasyMockTest {
new JettyServerModule(),
new StatsModule(),
new LifecycleModule(),
+ new SchedulerServicesModule(),
new AbstractModule() {
<T> T bindMock(Class<T> clazz) {
T mock = createMock(clazz);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b6217df1/src/test/java/org/apache/aurora/scheduler/state/PubsubTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/PubsubTestUtil.java b/src/test/java/org/apache/aurora/scheduler/state/PubsubTestUtil.java
index 48b3d88..23a316d 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/PubsubTestUtil.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/PubsubTestUtil.java
@@ -15,12 +15,12 @@ package org.apache.aurora.scheduler.state;
import java.util.Set;
+import com.google.common.util.concurrent.Service;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
-import com.twitter.common.application.StartupStage;
-import com.twitter.common.base.ExceptionalCommand;
+import org.apache.aurora.scheduler.AppStartup;
import org.apache.aurora.scheduler.events.EventSink;
/**
@@ -44,11 +44,11 @@ public final class PubsubTestUtil {
// TODO(wfarner): Make it easier to write a unit test wired for pubsub events.
// In this case, a trade-off was made to avoid installing several distant modules and providing
// required bindings that seem unrelated from this code.
- @SuppressWarnings("rawtypes")
- Set<ExceptionalCommand> startupCommands = injector.getInstance(
- Key.get(new TypeLiteral<Set<ExceptionalCommand>>() { }, StartupStage.class));
- for (ExceptionalCommand<?> command : startupCommands) {
- command.execute();
+ Set<Service> services = injector.getInstance(
+ Key.get(new TypeLiteral<Set<Service>>() { }, AppStartup.class));
+
+ for (Service service : services) {
+ service.startAsync().awaitRunning();
}
return injector.getInstance(EventSink.class);
}