You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/04 02:14:34 UTC
[2/2] git commit: Replace StorageStarted event with TaskStateChange
events.
Replace StorageStarted event with TaskStateChange events.
This change makes it easier to consume events about the state of tasks.
Previously, ~every consumer of TaskStateChange needed to know to also consume
StorageStarted and query the full storage. The new change makes TaskStateChange
more intuitive and therefore less error-prone.
Reviewed at https://reviews.apache.org/r/16575/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/d0b46fc3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/d0b46fc3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/d0b46fc3
Branch: refs/heads/master
Commit: d0b46fc3b316a9bc56e616be05dac4a9570d30e5
Parents: 9f35586
Author: Bill Farner <wf...@apache.org>
Authored: Fri Jan 3 17:14:19 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Fri Jan 3 17:14:19 2014 -0800
----------------------------------------------------------------------
.../aurora/scheduler/MesosSchedulerImpl.java | 14 ++-
.../aurora/scheduler/SchedulerLifecycle.java | 28 +++--
.../org/apache/aurora/scheduler/TaskVars.java | 31 ++----
.../aurora/scheduler/async/HistoryPruner.java | 33 +-----
.../scheduler/async/RescheduleCalculator.java | 3 +
.../aurora/scheduler/async/TaskGroups.java | 35 ++-----
.../aurora/scheduler/async/TaskScheduler.java | 2 +-
.../aurora/scheduler/async/TaskTimeout.java | 21 +---
.../aurora/scheduler/events/EventSink.java | 30 ++++++
.../events/NotifyingMethodInterceptor.java | 65 ------------
.../events/NotifyingSchedulingFilter.java | 7 +-
.../aurora/scheduler/events/PubsubEvent.java | 84 ++++++---------
.../scheduler/events/PubsubEventModule.java | 29 +-----
.../aurora/scheduler/metadata/NearestFit.java | 8 +-
.../aurora/scheduler/state/CronJobManager.java | 8 +-
.../scheduler/state/MaintenanceController.java | 40 +-------
.../scheduler/state/SideEffectStorage.java | 10 +-
.../scheduler/state/StateManagerImpl.java | 9 +-
.../storage/CallOrderEnforcingStorage.java | 25 ++++-
.../scheduler/MesosSchedulerImplTest.java | 15 ++-
.../scheduler/SchedulerLifecycleTest.java | 8 +-
.../apache/aurora/scheduler/TaskVarsTest.java | 67 +++++-------
.../scheduler/async/HistoryPrunerTest.java | 39 ++-----
.../scheduler/async/TaskSchedulerImplTest.java | 10 +-
.../scheduler/async/TaskSchedulerTest.java | 21 ++--
.../aurora/scheduler/async/TaskTimeoutTest.java | 40 ++------
.../events/NotifyingMethodInterceptorTest.java | 101 -------------------
.../events/NotifyingSchedulingFilterTest.java | 7 +-
.../scheduler/metadata/NearestFitTest.java | 2 +-
.../state/BaseSchedulerCoreImplTest.java | 8 +-
.../scheduler/state/CronJobManagerTest.java | 9 +-
.../state/MaintenanceControllerImplTest.java | 37 ++-----
.../aurora/scheduler/state/PubsubTestUtil.java | 7 +-
.../scheduler/state/StateManagerImplTest.java | 18 ++--
34 files changed, 268 insertions(+), 603 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
index 9033922..2e7d7f7 100644
--- a/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/MesosSchedulerImpl.java
@@ -35,8 +35,9 @@ import org.apache.aurora.gen.comm.SchedulerMessage;
import org.apache.aurora.scheduler.base.Conversions;
import org.apache.aurora.scheduler.base.SchedulerException;
import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.events.PubsubEvent.Interceptors.Event;
-import org.apache.aurora.scheduler.events.PubsubEvent.Interceptors.SendNotification;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
+import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
import org.apache.aurora.scheduler.state.SchedulerCore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
@@ -74,6 +75,7 @@ class MesosSchedulerImpl implements Scheduler {
private final Storage storage;
private final SchedulerCore schedulerCore;
private final Lifecycle lifecycle;
+ private final EventSink eventSink;
private volatile boolean registered = false;
/**
@@ -88,12 +90,14 @@ class MesosSchedulerImpl implements Scheduler {
Storage storage,
SchedulerCore schedulerCore,
final Lifecycle lifecycle,
- List<TaskLauncher> taskLaunchers) {
+ List<TaskLauncher> taskLaunchers,
+ EventSink eventSink) {
this.storage = checkNotNull(storage);
this.schedulerCore = checkNotNull(schedulerCore);
this.lifecycle = checkNotNull(lifecycle);
this.taskLaunchers = checkNotNull(taskLaunchers);
+ this.eventSink = checkNotNull(eventSink);
}
@Override
@@ -101,7 +105,6 @@ class MesosSchedulerImpl implements Scheduler {
LOG.info("Received notification of lost slave: " + slaveId);
}
- @SendNotification(after = Event.DriverRegistered)
@Override
public void registered(
SchedulerDriver driver,
@@ -116,13 +119,14 @@ class MesosSchedulerImpl implements Scheduler {
}
});
registered = true;
+ eventSink.post(new DriverRegistered());
}
- @SendNotification(after = Event.DriverDisconnected)
@Override
public void disconnected(SchedulerDriver schedulerDriver) {
LOG.warning("Framework disconnected.");
frameworkDisconnects.incrementAndGet();
+ eventSink.post(new DriverDisconnected());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
index a54c342..bf926a7 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
@@ -46,8 +46,10 @@ import com.twitter.common.zookeeper.Group.JoinException;
import com.twitter.common.zookeeper.ServerSet;
import com.twitter.common.zookeeper.SingletonService.LeaderControl;
+import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
@@ -95,7 +97,7 @@ public class SchedulerLifecycle implements EventSubscriber {
STORAGE_PREPARED,
LEADER_AWAITING_REGISTRATION,
REGISTERED_LEADER,
- RUNNING,
+ ACTIVE,
DEAD
}
@@ -114,14 +116,15 @@ public class SchedulerLifecycle implements EventSubscriber {
@Inject
SchedulerLifecycle(
- final DriverFactory driverFactory,
- final NonVolatileStorage storage,
- final Lifecycle lifecycle,
- final Driver driver,
- final DriverReference driverRef,
+ DriverFactory driverFactory,
+ NonVolatileStorage storage,
+ Lifecycle lifecycle,
+ Driver driver,
+ DriverReference driverRef,
final LeadingOptions leadingOptions,
final ScheduledExecutorService executorService,
- final Clock clock) {
+ Clock clock,
+ EventSink eventSink) {
this(
driverFactory,
@@ -150,7 +153,8 @@ public class SchedulerLifecycle implements EventSubscriber {
leadingOptions.registrationDelayLimit.getUnit().getTimeUnit());
}
},
- clock);
+ clock,
+ eventSink);
}
@VisibleForTesting
@@ -163,7 +167,8 @@ public class SchedulerLifecycle implements EventSubscriber {
final Driver driver,
final DriverReference driverRef,
final DelayedActions delayedActions,
- final Clock clock) {
+ final Clock clock,
+ final EventSink eventSink) {
Stats.export(new StatImpl<Integer>("framework_registered") {
@Override public Integer read() {
@@ -241,6 +246,7 @@ public class SchedulerLifecycle implements EventSubscriber {
final Closure<Transition<State>> handleRegistered = new Closure<Transition<State>>() {
@Override public void execute(Transition<State> transition) {
registrationAcked.set(true);
+ eventSink.post(new SchedulerActive());
try {
leaderControl.get().advertise();
} catch (JoinException e) {
@@ -306,9 +312,9 @@ public class SchedulerLifecycle implements EventSubscriber {
State.REGISTERED_LEADER, State.DEAD)
.addState(
State.REGISTERED_LEADER,
- State.RUNNING, State.DEAD)
+ State.ACTIVE, State.DEAD)
.addState(
- State.RUNNING,
+ State.ACTIVE,
State.DEAD)
.addState(
State.DEAD,
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/TaskVars.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskVars.java b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
index 32ec939..d5d752e 100644
--- a/src/main/java/org/apache/aurora/scheduler/TaskVars.java
+++ b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
@@ -34,9 +34,8 @@ import com.twitter.common.stats.StatsProvider;
import org.apache.aurora.gen.Attribute;
import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
+import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
import org.apache.aurora.scheduler.storage.AttributeStore;
@@ -53,12 +52,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
class TaskVars implements EventSubscriber {
private static final Logger LOG = Logger.getLogger(TaskVars.class.getName());
- // Used to ignore pubsub events sent before storage has completely started. This avoids a
- // miscount where a StorageStarted consumer is invoked before storageStarted is invoked here,
- // and pubsub events are fired for tasks that we have not yet counted. For example, if
- // tasksDeleted is invoked, we would end up with a negative count.
- private volatile boolean storageStarted = false;
-
private final LoadingCache<String, AtomicLong> countersByStatus;
private final LoadingCache<String, AtomicLong> countersByRack;
@@ -116,14 +109,12 @@ class TaskVars implements EventSubscriber {
@Subscribe
public void taskChangedState(TaskStateChange stateChange) {
- if (!storageStarted) {
- return;
- }
-
IScheduledTask task = stateChange.getTask();
- if (stateChange.getOldState() != ScheduleStatus.INIT) {
- decrementCount(stateChange.getOldState());
+ Optional<ScheduleStatus> previousState = stateChange.getOldState();
+ if (stateChange.isTransition() && !previousState.equals(Optional.of(ScheduleStatus.INIT))) {
+ decrementCount(previousState.get());
}
+
incrementCount(task.getStatus());
if (stateChange.getNewState() == ScheduleStatus.LOST) {
@@ -146,10 +137,9 @@ class TaskVars implements EventSubscriber {
}
@Subscribe
- public void storageStarted(StorageStarted event) {
- for (IScheduledTask task : Storage.Util.consistentFetchTasks(storage, Query.unscoped())) {
- incrementCount(task.getStatus());
- }
+ public void schedulerActive(SchedulerActive event) {
+ // TODO(wfarner): This should probably induce the initial 'export' of stats, so that incomplete
+ // values are not surfaced while storage is recovering.
// Dummy read the counter for each status counter. This is important to guarantee a stat with
// value zero is present for each state, even if all states are not represented in the task
@@ -157,15 +147,10 @@ class TaskVars implements EventSubscriber {
for (ScheduleStatus status : ScheduleStatus.values()) {
getCounter(status);
}
- storageStarted = true;
}
@Subscribe
public void tasksDeleted(final TasksDeleted event) {
- if (!storageStarted) {
- return;
- }
-
for (IScheduledTask task : event.getTasks()) {
decrementCount(task.getStatus());
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
index 462e0df..9f77bfc 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
@@ -41,10 +41,8 @@ import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.util.Clock;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -55,9 +53,7 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.aurora.scheduler.base.Tasks.LATEST_ACTIVITY;
import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import static org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import static org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
@@ -68,9 +64,6 @@ import static org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
public class HistoryPruner implements EventSubscriber {
private static final Logger LOG = Logger.getLogger(HistoryPruner.class.getName());
- @VisibleForTesting
- static final Query.Builder INACTIVE_QUERY = Query.unscoped().terminal();
-
private final Multimap<IJobKey, String> tasksByJob =
Multimaps.synchronizedSetMultimap(LinkedHashMultimap.<IJobKey, String>create());
@VisibleForTesting
@@ -79,7 +72,6 @@ public class HistoryPruner implements EventSubscriber {
}
private final ScheduledExecutorService executor;
- private final Storage storage;
private final StateManager stateManager;
private final Clock clock;
private final long pruneThresholdMillis;
@@ -93,14 +85,12 @@ public class HistoryPruner implements EventSubscriber {
@Inject
HistoryPruner(
final ScheduledExecutorService executor,
- final Storage storage,
final StateManager stateManager,
final Clock clock,
@PruneThreshold Amount<Long, Time> inactivePruneThreshold,
@PruneThreshold int perJobHistoryGoal) {
this.executor = checkNotNull(executor);
- this.storage = checkNotNull(storage);
this.stateManager = checkNotNull(stateManager);
this.clock = checkNotNull(clock);
this.pruneThresholdMillis = inactivePruneThreshold.as(Time.MILLISECONDS);
@@ -120,28 +110,13 @@ public class HistoryPruner implements EventSubscriber {
@Subscribe
public void recordStateChange(TaskStateChange change) {
if (Tasks.isTerminated(change.getNewState())) {
+ long timeoutBasis = change.isTransition()
+ ? clock.nowMillis()
+ : Iterables.getLast(change.getTask().getTaskEvents()).getTimestamp();
registerInactiveTask(
Tasks.SCHEDULED_TO_JOB_KEY.apply(change.getTask()),
change.getTaskId(),
- calculateTimeout(clock.nowMillis()));
- }
- }
-
- /**
- * When triggered, iterates through inactive tasks in the system and prunes tasks that
- * exceed the history goal for a job or are beyond the time threshold.
- *
- * @param event A new StorageStarted event.
- */
- @Subscribe
- public void storageStarted(StorageStarted event) {
- for (IScheduledTask task
- : LATEST_ACTIVITY.sortedCopy(Storage.Util.consistentFetchTasks(storage, INACTIVE_QUERY))) {
-
- registerInactiveTask(
- Tasks.SCHEDULED_TO_JOB_KEY.apply(task),
- Tasks.id(task),
- calculateTimeout(Iterables.getLast(task.getTaskEvents()).getTimestamp()));
+ calculateTimeout(timeoutBasis));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
index fb4d2b9..0265bf9 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
@@ -68,6 +68,8 @@ interface RescheduleCalculator {
*/
long getReadyTimeMs(IScheduledTask task);
+ // TODO(wfarner): Create a unit test for this class. It currently piggybacks on
+ // TaskSchedulerTest. Once a unit test exists, TaskSchedulerTest should use a mock.
class RescheduleCalculatorImpl implements RescheduleCalculator {
private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
@@ -75,6 +77,7 @@ interface RescheduleCalculator {
private final Storage storage;
private final RescheduleCalculatorSettings settings;
private final Clock clock;
+ // TODO(wfarner): Inject 'random' in the constructor for better test coverage.
private final Random random = new Random.SystemRandom(new java.util.Random());
private static final Predicate<ScheduleStatus> IS_ACTIVE_STATUS =
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
index 1119344..b50c625 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java
@@ -22,6 +22,7 @@ import java.util.logging.Logger;
import javax.inject.Inject;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
@@ -41,13 +42,10 @@ import com.twitter.common.util.Clock;
import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
-import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -70,7 +68,6 @@ public class TaskGroups implements EventSubscriber {
private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
- private final Storage storage;
private final LoadingCache<GroupKey, TaskGroup> groups;
private final Clock clock;
private final RescheduleCalculator rescheduleCalculator;
@@ -88,7 +85,6 @@ public class TaskGroups implements EventSubscriber {
@Inject
TaskGroups(
ShutdownRegistry shutdownRegistry,
- Storage storage,
TaskGroupsSettings settings,
TaskScheduler taskScheduler,
Clock clock,
@@ -96,7 +92,6 @@ public class TaskGroups implements EventSubscriber {
this(
createThreadPool(shutdownRegistry),
- storage,
settings.taskGroupBackoff,
settings.rateLimiter,
taskScheduler,
@@ -104,16 +99,15 @@ public class TaskGroups implements EventSubscriber {
rescheduleCalculator);
}
+ @VisibleForTesting
TaskGroups(
final ScheduledExecutorService executor,
- final Storage storage,
final BackoffStrategy taskGroupBackoffStrategy,
final RateLimiter rateLimiter,
final TaskScheduler taskScheduler,
final Clock clock,
final RescheduleCalculator rescheduleCalculator) {
- this.storage = checkNotNull(storage);
checkNotNull(executor);
checkNotNull(taskGroupBackoffStrategy);
checkNotNull(rateLimiter);
@@ -222,26 +216,11 @@ public class TaskGroups implements EventSubscriber {
@Subscribe
public synchronized void taskChangedState(TaskStateChange stateChange) {
if (stateChange.getNewState() == PENDING) {
- add(
- stateChange.getTask().getAssignedTask(),
- rescheduleCalculator.getReadyTimeMs(stateChange.getTask()));
- }
- }
-
- /**
- * Signals that storage has started and is consistent.
- * <p>
- * Upon this signal, all {@link org.apache.aurora.gen.ScheduleStatus#PENDING} tasks in the stoage
- * will become eligible for scheduling.
- *
- * @param event Storage started notification.
- */
- @Subscribe
- public void storageStarted(StorageStarted event) {
- for (IScheduledTask task
- : Storage.Util.consistentFetchTasks(storage, Query.unscoped().byStatus(PENDING))) {
-
- add(task.getAssignedTask(), rescheduleCalculator.getStartupReadyTimeMs(task));
+ IScheduledTask task = stateChange.getTask();
+ long readyAtMs = stateChange.isTransition()
+ ? rescheduleCalculator.getReadyTimeMs(task)
+ : rescheduleCalculator.getStartupReadyTimeMs(task);
+ add(task.getAssignedTask(), readyAtMs);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 4a91d9f..96c76ba 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -210,7 +210,7 @@ interface TaskScheduler extends EventSubscriber {
@Subscribe
public void taskChanged(final TaskStateChange stateChangeEvent) {
- if (stateChangeEvent.getOldState() == PENDING) {
+ if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) {
reservations.invalidateTask(stateChangeEvent.getTaskId());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
index 046befb..9f14a99 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
@@ -43,13 +43,9 @@ import com.twitter.common.util.Clock;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -76,9 +72,6 @@ class TaskTimeout implements EventSubscriber {
ScheduleStatus.RESTARTING,
ScheduleStatus.KILLING);
- @VisibleForTesting
- static final Query.Builder TRANSIENT_QUERY = Query.unscoped().byStatus(TRANSIENT_STATES);
-
private final Map<TimeoutKey, Context> futures = Maps.newConcurrentMap();
private static final class TimeoutKey {
@@ -111,7 +104,6 @@ class TaskTimeout implements EventSubscriber {
}
}
- private final Storage storage;
private final ScheduledExecutorService executor;
private final StateManager stateManager;
private final long timeoutMillis;
@@ -120,14 +112,12 @@ class TaskTimeout implements EventSubscriber {
@Inject
TaskTimeout(
- Storage storage,
ScheduledExecutorService executor,
StateManager stateManager,
final Clock clock,
Amount<Long, Time> timeout,
StatsProvider statsProvider) {
- this.storage = checkNotNull(storage);
this.executor = checkNotNull(executor);
this.stateManager = checkNotNull(stateManager);
this.timeoutMillis = timeout.as(Time.MILLISECONDS);
@@ -162,8 +152,8 @@ class TaskTimeout implements EventSubscriber {
public void recordStateChange(TaskStateChange change) {
String taskId = change.getTaskId();
ScheduleStatus newState = change.getNewState();
- if (isTransient(change.getOldState())) {
- TimeoutKey oldKey = new TimeoutKey(taskId, change.getOldState());
+ if (change.isTransition() && isTransient(change.getOldState().get())) {
+ TimeoutKey oldKey = new TimeoutKey(taskId, change.getOldState().get());
Context context = futures.remove(oldKey);
if (context != null) {
LOG.fine("Canceling state timeout for task " + oldKey);
@@ -176,13 +166,6 @@ class TaskTimeout implements EventSubscriber {
}
}
- @Subscribe
- public void storageStarted(StorageStarted event) {
- for (IScheduledTask task : Storage.Util.consistentFetchTasks(storage, TRANSIENT_QUERY)) {
- registerTimeout(new TimeoutKey(Tasks.id(task), task.getStatus()));
- }
- }
-
private class TimedOutTaskHandler implements Runnable {
private final TimeoutKey key;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/events/EventSink.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/EventSink.java b/src/main/java/org/apache/aurora/scheduler/events/EventSink.java
new file mode 100644
index 0000000..4fc425e
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/events/EventSink.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.events;
+
+/**
+ * A publishing channel for pubsub events, which will cause the event to be announced to relevant
+ * subscribers.
+ */
+public interface EventSink {
+
+ /**
+ * Posts an event.
+ *
+ * @param event Event to announce.
+ */
+ void post(PubsubEvent event);
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/events/NotifyingMethodInterceptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/NotifyingMethodInterceptor.java b/src/main/java/org/apache/aurora/scheduler/events/NotifyingMethodInterceptor.java
deleted file mode 100644
index 8003262..0000000
--- a/src/main/java/org/apache/aurora/scheduler/events/NotifyingMethodInterceptor.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.events;
-
-import java.lang.reflect.Method;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Preconditions;
-import com.twitter.common.base.Closure;
-
-import org.aopalliance.intercept.MethodInterceptor;
-import org.aopalliance.intercept.MethodInvocation;
-import org.apache.aurora.scheduler.events.PubsubEvent.Interceptors.Event;
-import org.apache.aurora.scheduler.events.PubsubEvent.Interceptors.SendNotification;
-
-/**
- * A method interceptor that sends pubsub notifications before and/or after a method annotated
- * with {@link SendNotification}
- * is invoked.
- */
-class NotifyingMethodInterceptor implements MethodInterceptor {
- private static final Logger LOG = Logger.getLogger(NotifyingMethodInterceptor.class.getName());
-
- @Inject
- private Closure<PubsubEvent> eventSink;
-
- private void maybeFire(Event event) {
- if (event != Event.None) {
- eventSink.execute(event.getEvent());
- }
- }
-
- @Override
- public Object invoke(MethodInvocation invocation) throws Throwable {
- Preconditions.checkNotNull(eventSink, "Event sink has not yet been set.");
-
- Method method = invocation.getMethod();
- SendNotification sendNotification = method.getAnnotation(SendNotification.class);
- if (sendNotification == null) {
- LOG.warning("Interceptor should not match methods without @"
- + SendNotification.class.getSimpleName());
- return invocation.proceed();
- }
-
- maybeFire(sendNotification.before());
- Object result = invocation.proceed();
- maybeFire(sendNotification.after());
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
index e5ab284..c7f4a1b 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
@@ -22,7 +22,6 @@ import java.util.Set;
import javax.inject.Inject;
import com.google.inject.BindingAnnotation;
-import com.twitter.common.base.Closure;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
@@ -49,12 +48,12 @@ class NotifyingSchedulingFilter implements SchedulingFilter {
public @interface NotifyDelegate { }
private final SchedulingFilter delegate;
- private final Closure<PubsubEvent> eventSink;
+ private final EventSink eventSink;
@Inject
NotifyingSchedulingFilter(
@NotifyDelegate SchedulingFilter delegate,
- Closure<PubsubEvent> eventSink) {
+ EventSink eventSink) {
this.delegate = checkNotNull(delegate);
this.eventSink = checkNotNull(eventSink);
@@ -64,7 +63,7 @@ class NotifyingSchedulingFilter implements SchedulingFilter {
public Set<Veto> filter(ResourceSlot offer, String slaveHost, ITaskConfig task, String taskId) {
Set<Veto> vetoes = delegate.filter(offer, slaveHost, task, taskId);
if (!vetoes.isEmpty()) {
- eventSink.execute(new Vetoed(taskId, vetoes));
+ eventSink.post(new Vetoed(taskId, vetoes));
}
return vetoes;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
index 971f40c..59e18ea 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEvent.java
@@ -15,11 +15,10 @@
*/
package org.apache.aurora.scheduler.events;
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
import java.util.Set;
import com.google.common.base.Objects;
+import com.google.common.base.Optional;
import org.apache.aurora.gen.HostStatus;
import org.apache.aurora.gen.ScheduleStatus;
@@ -27,9 +26,6 @@ import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
import static com.google.common.base.Preconditions.checkNotNull;
/**
@@ -83,23 +79,48 @@ public interface PubsubEvent {
/**
* Event sent when a task changed state.
*/
- public static class TaskStateChange implements PubsubEvent {
+ public static final class TaskStateChange implements PubsubEvent {
private final IScheduledTask task;
- private final ScheduleStatus oldState;
+ private final Optional<ScheduleStatus> oldState;
- public TaskStateChange(IScheduledTask task, ScheduleStatus oldState) {
+ private TaskStateChange(IScheduledTask task, Optional<ScheduleStatus> oldState) {
this.task = checkNotNull(task);
this.oldState = checkNotNull(oldState);
}
+ /**
+ * Creates a state change event that represents the initial value of a task.
+ *
+ * @param task Task structure.
+ * @return A state change event.
+ */
+ public static TaskStateChange initialized(IScheduledTask task) {
+ return new TaskStateChange(task, Optional.<ScheduleStatus>absent());
+ }
+
+ /**
+ * Creates a state change event that represents a transition from one state to another.
+ *
+ * @param task Current task structure.
+ * @param oldState State the task was previously in.
+ * @return A state change event.
+ */
+ public static TaskStateChange transition(IScheduledTask task, ScheduleStatus oldState) {
+ return new TaskStateChange(task, Optional.of(oldState));
+ }
+
public String getTaskId() {
return Tasks.id(task);
}
- public ScheduleStatus getOldState() {
+ public Optional<ScheduleStatus> getOldState() {
return oldState;
}
+ public boolean isTransition() {
+ return oldState.isPresent();
+ }
+
public IScheduledTask getTask() {
return task;
}
@@ -242,7 +263,7 @@ public interface PubsubEvent {
}
}
- public static class StorageStarted implements PubsubEvent {
+ public static class DriverRegistered implements PubsubEvent {
@Override
public boolean equals(Object o) {
return (o != null) && getClass().equals(o.getClass());
@@ -254,7 +275,7 @@ public interface PubsubEvent {
}
}
- public static class DriverRegistered implements PubsubEvent {
+ public static class DriverDisconnected implements PubsubEvent {
@Override
public boolean equals(Object o) {
return (o != null) && getClass().equals(o.getClass());
@@ -266,7 +287,7 @@ public interface PubsubEvent {
}
}
- public static class DriverDisconnected implements PubsubEvent {
+ public static class SchedulerActive implements PubsubEvent {
@Override
public boolean equals(Object o) {
return (o != null) && getClass().equals(o.getClass());
@@ -277,43 +298,4 @@ public interface PubsubEvent {
return getClass().hashCode();
}
}
-
- public static final class Interceptors {
- private Interceptors() {
- // Utility class.
- }
-
- public enum Event {
- None(null),
- StorageStarted(new StorageStarted()),
- DriverRegistered(new DriverRegistered()),
- DriverDisconnected(new DriverDisconnected());
-
- private final PubsubEvent event;
- private Event(PubsubEvent event) {
- this.event = event;
- }
-
- public PubsubEvent getEvent() {
- return event;
- }
- }
-
- /**
- * An annotation to place on methods of injected classes that which to fire events before
- * and/or after their invocation.
- */
- @Target(METHOD) @Retention(RUNTIME)
- public @interface SendNotification {
- /**
- * Event to fire prior to invocation.
- */
- Event before() default Event.None;
-
- /**
- * Event to fire after invocation.
- */
- Event after() default Event.None;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
index 1260a16..ebdede9 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
@@ -27,17 +27,12 @@ import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
-import com.google.inject.TypeLiteral;
-import com.google.inject.matcher.Matchers;
import com.google.inject.multibindings.Multibinder;
import com.twitter.common.application.modules.LifecycleModule;
-import com.twitter.common.base.Closure;
import com.twitter.common.base.Command;
-import org.aopalliance.intercept.MethodInterceptor;
import org.apache.aurora.scheduler.events.NotifyingSchedulingFilter.NotifyDelegate;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.Interceptors.SendNotification;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -69,17 +64,16 @@ public final class PubsubEventModule extends AbstractModule {
bind(EventBus.class).toInstance(eventBus);
- Closure<PubsubEvent> eventPoster = new Closure<PubsubEvent>() {
- @Override public void execute(PubsubEvent event) {
+ EventSink eventSink = new EventSink() {
+ @Override public void post(PubsubEvent event) {
eventBus.post(event);
}
};
- bind(new TypeLiteral<Closure<PubsubEvent>>() { }).toInstance(eventPoster);
+ bind(EventSink.class).toInstance(eventSink);
// Ensure at least an empty binding is present.
getSubscriberBinder(binder());
LifecycleModule.bindStartupAction(binder(), RegisterSubscribers.class);
- bindNotifyingInterceptor(binder());
}
static class RegisterSubscribers implements Command {
@@ -126,21 +120,4 @@ public final class PubsubEventModule extends AbstractModule {
public static void bindSubscriber(Binder binder, Class<? extends EventSubscriber> subscriber) {
getSubscriberBinder(binder).addBinding().to(subscriber);
}
-
- /**
- * Binds a method interceptor to all methods annotated with {@link SendNotification}.
- * <p>
- * The interceptor will send notifications before and/or after the wrapped method invocation.
- *
- * @param binder Guice binder.
- */
- @VisibleForTesting
- public static void bindNotifyingInterceptor(Binder binder) {
- MethodInterceptor interceptor = new NotifyingMethodInterceptor();
- binder.requestInjection(interceptor);
- binder.bindInterceptor(
- Matchers.any(),
- Matchers.annotatedWith(SendNotification.class),
- interceptor);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java b/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
index f801f80..96fcbc4 100644
--- a/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
+++ b/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
@@ -96,12 +96,12 @@ public class NearestFit implements EventSubscriber {
* Records a task state change event.
* This will ignore any events where the previous state is not {@link ScheduleStatus#PENDING}.
*
- * @param stateChangeEvent Task state change.
+ * @param event Task state change.
*/
@Subscribe
- public synchronized void stateChanged(TaskStateChange stateChangeEvent) {
- if (stateChangeEvent.getOldState() == ScheduleStatus.PENDING) {
- fitByTask.invalidate(stateChangeEvent.getTaskId());
+ public synchronized void stateChanged(TaskStateChange event) {
+ if (event.isTransition() && event.getOldState().get() == ScheduleStatus.PENDING) {
+ fitByTask.invalidate(event.getTaskId());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java b/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
index e1773e9..b772712 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/CronJobManager.java
@@ -61,7 +61,7 @@ import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
import org.apache.aurora.scheduler.cron.CronException;
import org.apache.aurora.scheduler.cron.CronScheduler;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
+import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
import org.apache.aurora.scheduler.storage.Storage.Work;
@@ -165,13 +165,13 @@ public class CronJobManager extends JobManager implements EventSubscriber {
}
/**
- * Notifies the cron job manager that storage is started, and job configurations are ready to
+ * Notifies the cron job manager that the scheduler is active, and job configurations are ready to
* load.
*
- * @param storageStarted Event.
+ * @param schedulerActive Event.
*/
@Subscribe
- public void storageStarted(StorageStarted storageStarted) {
+ public void schedulerActive(SchedulerActive schedulerActive) {
cron.start();
shutdownRegistry.addAction(new ExceptionalCommand<CronException>() {
@Override public void execute() throws CronException {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
index 007369c..3dd2271 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -22,14 +22,12 @@ import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.eventbus.Subscribe;
import com.twitter.common.base.Closure;
-import com.twitter.common.base.Closures;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.HostStatus;
@@ -37,9 +35,9 @@ import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage;
@@ -108,13 +106,13 @@ public interface MaintenanceController {
class MaintenanceControllerImpl implements MaintenanceController, EventSubscriber {
private final Storage storage;
private final StateManager stateManager;
- private final Closure<PubsubEvent> eventSink;
+ private final EventSink eventSink;
@Inject
public MaintenanceControllerImpl(
Storage storage,
StateManager stateManager,
- Closure<PubsubEvent> eventSink) {
+ EventSink eventSink) {
this.storage = checkNotNull(storage);
this.stateManager = checkNotNull(stateManager);
@@ -146,36 +144,6 @@ public interface MaintenanceController {
.build();
}
- private Set<HostStatus> watchDrainingTasks(MutableStoreProvider store, Set<String> hosts) {
- return watchDrainingTasks(store, hosts, Closures.<Query.Builder>noop());
- }
-
- private static final Predicate<HostAttributes> IS_DRAINING = new Predicate<HostAttributes>() {
- @Override public boolean apply(HostAttributes attributes) {
- return DRAINING == attributes.getMode();
- }
- };
-
- /**
- * Notifies the MaintenanceController that storage has started, and maintenance statuses are
- * ready to be loaded.
- *
- * @param started Event.
- */
- @Subscribe
- public void storageStarted(StorageStarted started) {
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider storeProvider) {
- Set<String> drainingHosts =
- FluentIterable.from(storeProvider.getAttributeStore().getHostAttributes())
- .filter(IS_DRAINING)
- .transform(HOST_NAME)
- .toSet();
- watchDrainingTasks(storeProvider, drainingHosts);
- }
- });
- }
-
/**
* Notifies the MaintenanceController that a task has changed state
*
@@ -292,7 +260,7 @@ public interface MaintenanceController {
for (String host : hosts) {
if (store.setMaintenanceMode(host, mode)) {
HostStatus status = new HostStatus().setHost(host).setMode(mode);
- eventSink.execute(new PubsubEvent.HostMaintenanceStateChange(status.deepCopy()));
+ eventSink.post(new PubsubEvent.HostMaintenanceStateChange(status.deepCopy()));
statuses.add(status);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java b/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
index 46e1568..2bdd459 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SideEffectStorage.java
@@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.twitter.common.base.Closure;
+import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
@@ -47,7 +47,7 @@ class SideEffectStorage {
private final Storage storage;
private final OperationFinalizer operationFinalizer;
- private final Closure<PubsubEvent> taskEventSink;
+ private final EventSink eventSink;
interface OperationFinalizer {
/**
@@ -68,11 +68,11 @@ class SideEffectStorage {
SideEffectStorage(
Storage storage,
OperationFinalizer operationFinalizer,
- Closure<PubsubEvent> taskEventSink) {
+ EventSink eventSink) {
this.storage = checkNotNull(storage);
this.operationFinalizer = checkNotNull(operationFinalizer);
- this.taskEventSink = checkNotNull(taskEventSink);
+ this.eventSink = checkNotNull(eventSink);
}
/**
@@ -154,7 +154,7 @@ class SideEffectStorage {
operationFinalizer.finalize(work, storeProvider);
if (topLevelOperation) {
while (!events.isEmpty()) {
- taskEventSink.execute(events.remove());
+ eventSink.post(events.remove());
}
}
return result;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 0024222..b6dd537 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -38,7 +38,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Atomics;
-import com.twitter.common.base.Closure;
import com.twitter.common.stats.Stats;
import com.twitter.common.util.Clock;
@@ -49,6 +48,7 @@ import org.apache.aurora.scheduler.Driver;
import org.apache.aurora.scheduler.TaskIdGenerator;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.state.SideEffectStorage.SideEffectWork;
import org.apache.aurora.scheduler.storage.Storage;
@@ -157,7 +157,7 @@ public class StateManagerImpl implements StateManager {
final Clock clock,
Driver driver,
TaskIdGenerator taskIdGenerator,
- Closure<PubsubEvent> taskEventSink) {
+ EventSink eventSink) {
checkNotNull(storage);
this.clock = checkNotNull(clock);
@@ -168,8 +168,7 @@ public class StateManagerImpl implements StateManager {
}
};
- this.storage = new SideEffectStorage(storage, finalizer, taskEventSink);
-
+ this.storage = new SideEffectStorage(storage, finalizer, eventSink);
this.driver = checkNotNull(driver);
this.taskIdGenerator = checkNotNull(taskIdGenerator);
@@ -369,7 +368,7 @@ public class StateManagerImpl implements StateManager {
}
});
sideEffectWork.addTaskEvent(
- new PubsubEvent.TaskStateChange(
+ PubsubEvent.TaskStateChange.transition(
Iterables.getOnlyElement(taskStore.fetchTasks(idQuery)),
stateMachine.getPreviousState()));
break;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
index cdc7ce6..a556ae2 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
@@ -28,10 +28,15 @@ import com.google.inject.Module;
import com.google.inject.PrivateModule;
import com.twitter.common.util.StateMachine;
-import org.apache.aurora.scheduler.events.PubsubEvent.Interceptors.Event;
-import org.apache.aurora.scheduler.events.PubsubEvent.Interceptors.SendNotification;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+
+import static com.google.common.base.Preconditions.checkNotNull;
/**
* A non-volatile storage wrapper that enforces method call ordering.
@@ -47,6 +52,7 @@ public class CallOrderEnforcingStorage implements NonVolatileStorage {
private @interface EnforceOrderOn { }
private final NonVolatileStorage wrapped;
+ private final EventSink eventSink;
private enum State {
CONSTRUCTED,
@@ -64,8 +70,9 @@ public class CallOrderEnforcingStorage implements NonVolatileStorage {
.build();
@Inject
- CallOrderEnforcingStorage(@EnforceOrderOn NonVolatileStorage wrapped) {
- this.wrapped = wrapped;
+ CallOrderEnforcingStorage(@EnforceOrderOn NonVolatileStorage wrapped, EventSink eventSink) {
+ this.wrapped = checkNotNull(wrapped);
+ this.eventSink = checkNotNull(eventSink);
}
private void checkInState(State state) throws StorageException {
@@ -81,12 +88,20 @@ public class CallOrderEnforcingStorage implements NonVolatileStorage {
stateMachine.transition(State.PREPARED);
}
- @SendNotification(after = Event.StorageStarted)
@Override
public void start(Quiet initializationLogic) throws StorageException {
checkInState(State.PREPARED);
wrapped.start(initializationLogic);
stateMachine.transition(State.READY);
+ wrapped.write(new MutateWork.NoResult.Quiet() {
+ @Override protected void execute(MutableStoreProvider storeProvider) {
+ Iterable<IScheduledTask> tasks = Tasks.LATEST_ACTIVITY.sortedCopy(
+ storeProvider.getTaskStore().fetchTasks(Query.unscoped()));
+ for (IScheduledTask task : tasks) {
+ eventSink.post(TaskStateChange.initialized(task));
+ }
+ }
+ });
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
index 5937e9d..fd4d5d5 100644
--- a/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/MesosSchedulerImplTest.java
@@ -28,17 +28,15 @@ import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.TypeLiteral;
import com.twitter.common.application.Lifecycle;
-import com.twitter.common.base.Closure;
import com.twitter.common.base.Command;
import com.twitter.common.testing.easymock.EasyMockTest;
import org.apache.aurora.scheduler.base.Conversions;
import org.apache.aurora.scheduler.base.SchedulerException;
import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
-import org.apache.aurora.scheduler.events.PubsubEventModule;
import org.apache.aurora.scheduler.state.SchedulerCore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.StorageException;
@@ -108,7 +106,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
private TaskLauncher systemLauncher;
private TaskLauncher userLauncher;
private SchedulerDriver driver;
- private Closure<PubsubEvent> eventBus;
+ private EventSink eventSink;
private MesosSchedulerImpl scheduler;
@@ -119,7 +117,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
new Lifecycle(createMock(Command.class), createMock(UncaughtExceptionHandler.class));
systemLauncher = createMock(TaskLauncher.class);
userLauncher = createMock(TaskLauncher.class);
- eventBus = createMock(new Clazz<Closure<PubsubEvent>>() { });
+ eventSink = createMock(EventSink.class);
Injector injector = Guice.createInjector(new AbstractModule() {
@Override protected void configure() {
@@ -128,8 +126,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
bind(Lifecycle.class).toInstance(lifecycle);
bind(new TypeLiteral<List<TaskLauncher>>() { })
.toInstance(Arrays.asList(systemLauncher, userLauncher));
- bind(new TypeLiteral<Closure<PubsubEvent>>() { }).toInstance(eventBus);
- PubsubEventModule.bindNotifyingInterceptor(binder());
+ bind(EventSink.class).toInstance(eventSink);
}
});
scheduler = injector.getInstance(MesosSchedulerImpl.class);
@@ -260,7 +257,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
public void testDisconnected() throws Exception {
new RegisteredFixture() {
@Override void expectations() throws Exception {
- eventBus.execute(new DriverDisconnected());
+ eventSink.post(new DriverDisconnected());
}
@Override void test() {
@@ -287,7 +284,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
void run() throws Exception {
runCalled.set(true);
- eventBus.execute(new DriverRegistered());
+ eventSink.post(new DriverRegistered());
storageUtil.expectOperations();
storageUtil.schedulerStore.saveFrameworkId(FRAMEWORK_ID);
expectations();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
index fc789e7..24cd6fa 100644
--- a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
@@ -26,7 +26,9 @@ import com.twitter.common.zookeeper.SingletonService.LeadershipListener;
import org.apache.aurora.scheduler.SchedulerLifecycle.DelayedActions;
import org.apache.aurora.scheduler.SchedulerLifecycle.DriverReference;
+import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
+import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.apache.mesos.Protos.Status;
@@ -52,6 +54,7 @@ public class SchedulerLifecycleTest extends EasyMockTest {
private LeaderControl leaderControl;
private SchedulerDriver schedulerDriver;
private DelayedActions delayedActions;
+ private EventSink eventSink;
private SchedulerLifecycle schedulerLifecycle;
@@ -65,6 +68,7 @@ public class SchedulerLifecycleTest extends EasyMockTest {
leaderControl = createMock(LeaderControl.class);
schedulerDriver = createMock(SchedulerDriver.class);
delayedActions = createMock(DelayedActions.class);
+ eventSink = createMock(EventSink.class);
schedulerLifecycle = new SchedulerLifecycle(
driverFactory,
storageUtil.storage,
@@ -76,7 +80,8 @@ public class SchedulerLifecycleTest extends EasyMockTest {
driver,
driverRef,
delayedActions,
- createMock(Clock.class));
+ createMock(Clock.class),
+ eventSink);
}
@Test
@@ -98,6 +103,7 @@ public class SchedulerLifecycleTest extends EasyMockTest {
delayedActions.blockingDriverJoin(EasyMock.<Runnable>anyObject());
leaderControl.advertise();
+ eventSink.post(new SchedulerActive());
leaderControl.leave();
driver.stop();
storageUtil.storage.stop();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
index f09acfa..dde053c 100644
--- a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
@@ -31,8 +31,7 @@ import org.apache.aurora.gen.Identity;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
+import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -67,28 +66,31 @@ public class TaskVarsTest extends EasyMockTest {
public void setUp() {
storageUtil = new StorageTestUtil(this);
trackedStats = createMock(StatsProvider.class);
+ vars = new TaskVars(storageUtil.storage, trackedStats);
+
+ storageUtil.expectOperations();
+ globalCounters = Maps.newHashMap();
}
- private void initialize() {
- vars = new TaskVars(storageUtil.storage, trackedStats);
- vars.storageStarted(new StorageStarted());
+ private void expectStatusCountersInitialized() {
+ for (ScheduleStatus status : ScheduleStatus.values()) {
+ AtomicLong counter = new AtomicLong(0);
+ globalCounters.put(status, counter);
+ expect(trackedStats.makeCounter(TaskVars.getVarName(status))).andReturn(counter);
+ }
}
private void changeState(IScheduledTask task, ScheduleStatus status) {
- vars.taskChangedState(new TaskStateChange(
+ vars.taskChangedState(TaskStateChange.transition(
IScheduledTask.build(task.newBuilder().setStatus(status)),
task.getStatus()));
}
- private void expectLoadStorage(IScheduledTask... result) {
- storageUtil.expectOperations();
- storageUtil.expectTaskFetch(Query.unscoped(), result);
- globalCounters = Maps.newHashMap();
- for (ScheduleStatus status : ScheduleStatus.values()) {
- AtomicLong counter = new AtomicLong(0);
- globalCounters.put(status, counter);
- expect(trackedStats.makeCounter(TaskVars.getVarName(status))).andReturn(counter);
+ private void schedulerActivated(IScheduledTask... initialTasks) {
+ for (IScheduledTask task : initialTasks) {
+ vars.taskChangedState(TaskStateChange.initialized(task));
}
+ vars.schedulerActive(new SchedulerActive());
}
private IScheduledTask makeTask(String job, ScheduleStatus status, String host) {
@@ -114,10 +116,10 @@ public class TaskVarsTest extends EasyMockTest {
@Test
public void testStartsAtZero() {
- expectLoadStorage();
+ expectStatusCountersInitialized();
control.replay();
- initialize();
+ schedulerActivated();
assertAllZero();
}
@@ -132,10 +134,10 @@ public class TaskVarsTest extends EasyMockTest {
@Test
public void testTaskLifeCycle() {
- expectLoadStorage();
+ expectStatusCountersInitialized();
control.replay();
- initialize();
+ schedulerActivated();
IScheduledTask taskA = makeTask(JOB_A, INIT);
changeState(taskA, PENDING);
@@ -156,14 +158,15 @@ public class TaskVarsTest extends EasyMockTest {
@Test
public void testLoadsFromStorage() {
- expectLoadStorage(
+ expectStatusCountersInitialized();
+
+ control.replay();
+ schedulerActivated(
makeTask(JOB_A, PENDING),
makeTask(JOB_A, RUNNING),
makeTask(JOB_A, FINISHED),
makeTask(JOB_B, PENDING),
makeTask(JOB_B, FAILED));
- control.replay();
- initialize();
assertEquals(2, globalCounters.get(PENDING).get());
assertEquals(1, globalCounters.get(RUNNING).get());
@@ -182,7 +185,7 @@ public class TaskVarsTest extends EasyMockTest {
@Test
public void testLostCounters() {
- expectLoadStorage();
+ expectStatusCountersInitialized();
expectGetHostRack("host1", "rackA").atLeastOnce();
expectGetHostRack("host2", "rackB").atLeastOnce();
expectGetHostRack("host3", "rackB").atLeastOnce();
@@ -193,7 +196,7 @@ public class TaskVarsTest extends EasyMockTest {
expect(trackedStats.makeCounter(TaskVars.rackStatName("rackB"))).andReturn(rackB);
control.replay();
- initialize();
+ schedulerActivated();
IScheduledTask a = makeTask("jobA", RUNNING, "host1");
IScheduledTask b = makeTask("jobB", RUNNING, "host2");
@@ -211,29 +214,15 @@ public class TaskVarsTest extends EasyMockTest {
@Test
public void testRackMissing() {
- expectLoadStorage();
+ expectStatusCountersInitialized();
expect(storageUtil.attributeStore.getHostAttributes("a"))
.andReturn(Optional.<HostAttributes>absent());
control.replay();
- initialize();
+ schedulerActivated();
IScheduledTask a = makeTask(JOB_A, RUNNING, "a");
changeState(a, LOST);
// Since no attributes are stored for the host, a variable is not exported/updated.
}
-
- @Test
- public void testDeleteBeforeStorageStarted() {
- expectLoadStorage();
-
- control.replay();
-
- vars = new TaskVars(storageUtil.storage, trackedStats);
- vars.tasksDeleted(new TasksDeleted(ImmutableSet.of(makeTask("jobA", RUNNING, "host1"))));
- assertEquals(0, globalCounters.get(RUNNING).get());
-
- vars.storageStarted(new StorageStarted());
- assertEquals(0, globalCounters.get(RUNNING).get());
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
index 530254a..afb7ce9 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
@@ -39,12 +39,10 @@ import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskEvent;
import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
@@ -78,7 +76,6 @@ public class HistoryPrunerTest extends EasyMockTest {
private ScheduledFuture<?> future;
private ScheduledExecutorService executor;
private FakeClock clock;
- private StorageTestUtil storageUtil;
private StateManager stateManager;
private HistoryPruner pruner;
@@ -87,12 +84,9 @@ public class HistoryPrunerTest extends EasyMockTest {
future = createMock(new Clazz<ScheduledFuture<?>>() { });
executor = createMock(ScheduledExecutorService.class);
clock = new FakeClock();
- storageUtil = new StorageTestUtil(this);
- storageUtil.expectOperations();
stateManager = createMock(StateManager.class);
pruner = new HistoryPruner(
executor,
- storageUtil.storage,
stateManager,
clock,
ONE_DAY,
@@ -109,16 +103,7 @@ public class HistoryPrunerTest extends EasyMockTest {
}
@Test
- public void testNoTasksOnStorageStart() {
- expectGetInactiveTasks();
-
- control.replay();
-
- pruner.storageStarted(new StorageStarted());
- }
-
- @Test
- public void testStorageStartWithoutPruning() {
+ public void testNoPruning() {
long taskATimestamp = clock.nowMillis();
IScheduledTask a = makeTask("a", FINISHED);
@@ -126,7 +111,6 @@ public class HistoryPrunerTest extends EasyMockTest {
long taskBTimestamp = clock.nowMillis();
IScheduledTask b = makeTask("b", LOST);
- expectGetInactiveTasks(a, b);
expectOneTaskWatch(taskATimestamp);
expectOneTaskWatch(taskBTimestamp);
@@ -134,7 +118,8 @@ public class HistoryPrunerTest extends EasyMockTest {
control.replay();
- pruner.storageStarted(new StorageStarted());
+ pruner.recordStateChange(TaskStateChange.initialized(a));
+ pruner.recordStateChange(TaskStateChange.initialized(b));
// Clean-up
pruner.tasksDeleted(new TasksDeleted(ImmutableSet.of(a, b)));
@@ -157,7 +142,6 @@ public class HistoryPrunerTest extends EasyMockTest {
IScheduledTask d = makeTask("d", FINISHED);
IScheduledTask e = makeTask("job-x", "e", FINISHED);
- expectGetInactiveTasks(a, b, c, d, e);
expectOneTaskWatch(taskATimestamp);
expectOneTaskWatch(taskBTimestamp);
expectOneTaskWatch(taskCTimestamp);
@@ -176,7 +160,9 @@ public class HistoryPrunerTest extends EasyMockTest {
control.replay();
- pruner.storageStarted(new StorageStarted());
+ for (IScheduledTask task : ImmutableList.of(a, b, c, d, e)) {
+ pruner.recordStateChange(TaskStateChange.initialized(task));
+ }
// Clean-up
pruner.tasksDeleted(new TasksDeleted(ImmutableSet.of(c, d, e)));
@@ -243,13 +229,12 @@ public class HistoryPrunerTest extends EasyMockTest {
public void testTasksDeleted() {
IScheduledTask a = makeTask("a", FINISHED);
IScheduledTask b = makeTask("b", FINISHED);
- expectGetInactiveTasks(a);
expectDefaultTaskWatch();
expectCancelFuture();
control.replay();
- pruner.storageStarted(new StorageStarted());
+ pruner.recordStateChange(TaskStateChange.initialized(a));
// Cancels existing future for task 'a'
pruner.tasksDeleted(new TasksDeleted(ImmutableSet.of(a)));
@@ -315,7 +300,6 @@ public class HistoryPrunerTest extends EasyMockTest {
.build());
return new HistoryPruner(
realExecutor,
- storageUtil.storage,
stateManager,
clock,
Amount.of(1L, Time.MILLISECONDS),
@@ -392,15 +376,10 @@ public class HistoryPrunerTest extends EasyMockTest {
private IScheduledTask changeState(String taskId, ScheduleStatus from, ScheduleStatus to) {
IScheduledTask task = makeTask(taskId, to);
- pruner.recordStateChange(new PubsubEvent.TaskStateChange(task, from));
+ pruner.recordStateChange(TaskStateChange.transition(task, from));
return task;
}
- private void expectGetInactiveTasks(IScheduledTask... tasks) {
- expect(storageUtil.taskStore.fetchTasks(HistoryPruner.INACTIVE_QUERY))
- .andReturn(ImmutableSet.copyOf(tasks));
- }
-
private IScheduledTask makeTask(
String job,
String taskId,
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 4c17176..35dd666 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
-import com.twitter.common.base.Closure;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.testing.easymock.EasyMockTest;
@@ -33,7 +32,8 @@ import org.apache.aurora.gen.Identity;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.state.PubsubTestUtil;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.state.TaskAssigner;
@@ -62,7 +62,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
private Preemptor preemptor;
private Amount<Long, Time> reservationDuration;
private Amount<Long, Time> halfReservationDuration;
- private Closure<PubsubEvent> eventSink;
+ private EventSink eventSink;
@Before
public void setUp() throws Exception {
@@ -163,7 +163,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
assertEquals(scheduler.schedule("a"), TaskScheduler.TaskSchedulerResult.TRY_AGAIN);
assertEquals(scheduler.schedule("a"), TaskScheduler.TaskSchedulerResult.SUCCESS);
firstAssignment.getValue().apply(offerA);
- eventSink.execute(new PubsubEvent.TaskStateChange(taskA, PENDING));
+ eventSink.post(TaskStateChange.transition(taskA, PENDING));
clock.advance(halfReservationDuration);
assertEquals(scheduler.schedule("b"), TaskScheduler.TaskSchedulerResult.SUCCESS);
secondAssignment.getValue().apply(offerA);
@@ -222,7 +222,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
assertEquals(scheduler.schedule("a"), TaskScheduler.TaskSchedulerResult.TRY_AGAIN);
clock.advance(halfReservationDuration);
// Task is killed by user before it is scheduled
- eventSink.execute(new PubsubEvent.TaskStateChange(taskA, PENDING));
+ eventSink.post(TaskStateChange.transition(taskA, PENDING));
assertEquals(scheduler.schedule("b"), TaskScheduler.TaskSchedulerResult.SUCCESS);
assignment.getValue().apply(offerA);
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 8bc15a4..9698f28 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -24,6 +24,7 @@ import javax.annotation.Nullable;
import com.google.common.base.Function;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.RateLimiter;
import com.twitter.common.quantity.Amount;
@@ -48,7 +49,6 @@ import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.HostMaintenanceStateChange;
-import org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -144,7 +144,6 @@ public class TaskSchedulerTest extends EasyMockTest {
clock);
taskGroups = new TaskGroups(
executor,
- storage,
retryStrategy,
rateLimiter,
scheduler,
@@ -185,7 +184,7 @@ public class TaskSchedulerTest extends EasyMockTest {
}
}
});
- taskGroups.taskChangedState(new TaskStateChange(copy, oldState));
+ taskGroups.taskChangedState(TaskStateChange.transition(copy, oldState));
}
private Capture<Runnable> expectTaskRetryIn(long penaltyMs) {
@@ -253,16 +252,17 @@ public class TaskSchedulerTest extends EasyMockTest {
replayAndCreateScheduler();
+ final IScheduledTask a = makeTask("a", KILLED);
+ final IScheduledTask b = makeTask("b", PENDING);
final IScheduledTask c = makeTask("c", RUNNING);
storage.write(new MutateWork.NoResult.Quiet() {
@Override protected void execute(MutableStoreProvider store) {
- store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(
- makeTask("a", KILLED),
- makeTask("b", PENDING),
- c));
+ store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(a, b, c));
}
});
- taskGroups.storageStarted(new StorageStarted());
+ for (IScheduledTask task : ImmutableList.of(a, b, c)) {
+ taskGroups.taskChangedState(TaskStateChange.initialized(task));
+ }
changeState(c, RUNNING, FINISHED);
}
@@ -272,7 +272,7 @@ public class TaskSchedulerTest extends EasyMockTest {
replayAndCreateScheduler();
- taskGroups.taskChangedState(new TaskStateChange(makeTask("a", PENDING), INIT));
+ taskGroups.taskChangedState(TaskStateChange.transition(makeTask("a", PENDING), INIT));
timeoutCapture.getValue().run();
}
@@ -581,7 +581,8 @@ public class TaskSchedulerTest extends EasyMockTest {
// Ensure the offer was consumed.
changeState(task, INIT, PENDING);
storage.write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider storeProvider) {
+ @Override
+ protected void execute(MutableStoreProvider storeProvider) {
storeProvider.getUnsafeTaskStore().deleteTasks(Tasks.ids(task));
}
});
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
index 375b6c2..023905b 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
@@ -36,11 +36,9 @@ import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskEvent;
import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IExpectationSetters;
@@ -73,7 +71,6 @@ public class TaskTimeoutTest extends EasyMockTest {
private Capture<Supplier<Number>> stateCountCapture;
private Map<ScheduleStatus, Capture<Supplier<Number>>> stateCaptures;
- private StorageTestUtil storageUtil;
private ScheduledExecutorService executor;
private ScheduledFuture<?> future;
private StateManager stateManager;
@@ -83,8 +80,6 @@ public class TaskTimeoutTest extends EasyMockTest {
@Before
public void setUp() {
- storageUtil = new StorageTestUtil(this);
- storageUtil.expectOperations();
executor = createMock(ScheduledExecutorService.class);
future = createMock(new Clazz<ScheduledFuture<?>>() { });
stateManager = createMock(StateManager.class);
@@ -122,7 +117,6 @@ public class TaskTimeoutTest extends EasyMockTest {
private void replayAndCreate() {
control.replay();
timeout = new TaskTimeout(
- storageUtil.storage,
executor,
stateManager,
clock,
@@ -152,7 +146,7 @@ public class TaskTimeoutTest extends EasyMockTest {
IScheduledTask task = IScheduledTask.build(new ScheduledTask()
.setStatus(to)
.setAssignedTask(new AssignedTask().setTaskId(taskId)));
- timeout.recordStateChange(new TaskStateChange(task, from));
+ timeout.recordStateChange(TaskStateChange.transition(task, from));
}
private void changeState(ScheduleStatus from, ScheduleStatus to) {
@@ -234,13 +228,7 @@ public class TaskTimeoutTest extends EasyMockTest {
@Test
public void testStorageStart() {
- clock.setNowMillis(TIMEOUT_MS * 2);
- storageUtil.expectTaskFetch(
- TaskTimeout.TRANSIENT_QUERY,
- makeTask("a", ASSIGNED, 0),
- makeTask("b", KILLING, TIMEOUT_MS),
- makeTask("c", PREEMPTING, TIMEOUT_MS * 3) /* In the future */
- );
+
expectTaskWatch(TIMEOUT_MS);
expectTaskWatch(TIMEOUT_MS);
expectTaskWatch(TIMEOUT_MS);
@@ -248,24 +236,18 @@ public class TaskTimeoutTest extends EasyMockTest {
replayAndCreate();
- timeout.storageStarted(new StorageStarted());
- changeState("a", ASSIGNED, RUNNING);
- changeState("b", KILLING, KILLED);
- changeState("c", PREEMPTING, FINISHED);
- }
-
- @Test
- public void testStorageStartTwice() {
- // This should never happen, but testing that the class handles it gracefully.
- storageUtil.expectTaskFetch(TaskTimeout.TRANSIENT_QUERY, makeTask("a", ASSIGNED, 0)).times(2);
- expectTaskWatch();
- expectCancel();
+ clock.setNowMillis(TIMEOUT_MS * 2);
+ for (IScheduledTask task : ImmutableList.of(
+ makeTask("a", ASSIGNED, 0),
+ makeTask("b", KILLING, TIMEOUT_MS),
+ makeTask("c", PREEMPTING, clock.nowMillis() + TIMEOUT_MS))) {
- replayAndCreate();
+ timeout.recordStateChange(TaskStateChange.initialized(task));
+ }
- timeout.storageStarted(new StorageStarted());
- timeout.storageStarted(new StorageStarted());
changeState("a", ASSIGNED, RUNNING);
+ changeState("b", KILLING, KILLED);
+ changeState("c", PREEMPTING, FINISHED);
}
private void checkOutstandingTimer(ScheduleStatus status, long expectedValue) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/d0b46fc3/src/test/java/org/apache/aurora/scheduler/events/NotifyingMethodInterceptorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingMethodInterceptorTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingMethodInterceptorTest.java
deleted file mode 100644
index a38d208..0000000
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingMethodInterceptorTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.events;
-
-import javax.inject.Singleton;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.TypeLiteral;
-import com.google.inject.matcher.Matchers;
-import com.twitter.common.base.Closure;
-import com.twitter.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
-import org.apache.aurora.scheduler.events.PubsubEvent.Interceptors.Event;
-import org.apache.aurora.scheduler.events.PubsubEvent.Interceptors.SendNotification;
-import org.apache.aurora.scheduler.events.PubsubEvent.StorageStarted;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class NotifyingMethodInterceptorTest extends EasyMockTest {
-
- private Closure<PubsubEvent> eventSink;
- private NotifyingMethodInterceptor interceptor;
-
- @Before
- public void setUp() throws Exception {
- eventSink = createMock(new Clazz<Closure<PubsubEvent>>() { });
- Injector injector = Guice.createInjector(new AbstractModule() {
- @Override protected void configure() {
- bind(new TypeLiteral<Closure<PubsubEvent>>() { }).toInstance(eventSink);
- NotifyingMethodInterceptor bound = new NotifyingMethodInterceptor();
- bind(NotifyingMethodInterceptor.class).toInstance(bound);
- requestInjection(bound);
- }
- });
- interceptor = injector.getInstance(NotifyingMethodInterceptor.class);
- }
-
- @Test
- public void testNotifications() {
- eventSink.execute(new DriverRegistered());
- eventSink.execute(new StorageStarted());
- eventSink.execute(new DriverRegistered());
-
- control.replay();
-
- Injector injector = Guice.createInjector(new AbstractModule() {
- @Override protected void configure() {
- bind(Math.class).in(Singleton.class);
- bindInterceptor(
- Matchers.any(),
- Matchers.annotatedWith(SendNotification.class),
- interceptor);
- }
- });
-
- Math math = injector.getInstance(Math.class);
- assertEquals(4, math.add(2, 2));
- assertEquals(0, math.subtract(2, 2));
- assertEquals(4, math.multiply(2, 2));
- assertEquals(1, math.divide(2, 2));
- }
-
- static class Math {
- @SendNotification(before = Event.DriverRegistered, after = Event.StorageStarted)
- int add(int a, int b) {
- return a + b;
- }
-
- @SendNotification(after = Event.DriverRegistered)
- int subtract(int a, int b) {
- return a - b;
- }
-
- @SendNotification
- int multiply(int a, int b) {
- return a * b;
- }
-
- int divide(int a, int b) {
- return a / b;
- }
- }
-}