You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2013/12/19 02:59:42 UTC
git commit: Adding random jitter for the initial GC task scheduling
to better spread GC execution over time.
Updated Branches:
refs/heads/master 4f1c62188 -> 43344575d
Adding random jitter for the initial GC task scheduling to better spread GC execution over time.
Reviewed at https://reviews.apache.org/r/16247/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/43344575
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/43344575
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/43344575
Branch: refs/heads/master
Commit: 43344575d0e4995cb4a2d4e4b00fd117ddb2567c
Parents: 4f1c621
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Wed Dec 18 17:58:47 2013 -0800
Committer: Maxim Khutornenko <mk...@twitter.com>
Committed: Wed Dec 18 17:58:47 2013 -0800
----------------------------------------------------------------------
.../twitter/aurora/scheduler/PulseMonitor.java | 95 --------------------
.../aurora/scheduler/SchedulerModule.java | 13 ++-
.../scheduler/periodic/GcExecutorLauncher.java | 89 ++++++++++++------
.../periodic/GcExecutorLauncherTest.java | 63 +++++++------
4 files changed, 106 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/43344575/src/main/java/com/twitter/aurora/scheduler/PulseMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/PulseMonitor.java b/src/main/java/com/twitter/aurora/scheduler/PulseMonitor.java
deleted file mode 100644
index e31ce3d..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/PulseMonitor.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Supplier;
-import com.google.common.cache.CacheBuilder;
-
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-
-/**
- * A pulse monitor to identify when a pulse has not been received for an item beyond a defined
- * threshold.
- * Also acts as a supplier of the pulsed type, which will provide access to all non-expired
- * entries.
- *
- * @param <T> The type of values to track.
- */
-public interface PulseMonitor<T> extends Supplier<Set<T>> {
-
- /**
- * Receive a pulse for an entry, effectively marking it as alive.
- *
- * @param t Item to update.
- */
- void pulse(T t);
-
- /**
- * Checks if an entry is considered alive, based on the expiration time. Note that if the
- * monitor is created and this method is called before {@link #pulse(Object)},
- * this method will always return {@code false}.
- *
- * @param t Item to check the pulse of.
- * @return {@code true} if a pulse has been received for the entry, and the time between now and
- * the last pulse is less than the expiration period, {@code false} otherwise.
- */
- boolean isAlive(T t);
-
- /**
- * Pulse monitor implementation using a decaying map for time expiration.
- *
- * @param <T> The type of values to track.
- */
- public static class PulseMonitorImpl<T> implements PulseMonitor<T> {
-
- private final Map<T, T> pulses;
-
- /**
- * Creates a new pulse monitor that will consider an entry dead if the time since the last pulse
- * for the entry is greater than {@code expiration}.
- *
- * @param expiration Time after which an entry is considered dead.
- */
- public PulseMonitorImpl(Amount<Long, Time> expiration) {
- // TODO(William Farner): Consider using timestamps instead and allowing exposure of live
- // entries and the time since their last pulse.
- pulses = CacheBuilder.newBuilder()
- .expireAfterWrite(expiration.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)
- .<T, T>build()
- .asMap();
- }
-
- @Override
- public void pulse(T t) {
- pulses.put(t, t);
- }
-
- @Override
- public boolean isAlive(T t) {
- return pulses.containsKey(t);
- }
-
- @Override
- public Set<T> get() {
- return pulses.keySet();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/43344575/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java b/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
index 44aeadc..be4c2b1 100644
--- a/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
+++ b/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
@@ -34,13 +34,12 @@ import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
import com.twitter.aurora.scheduler.Driver.DriverImpl;
-import com.twitter.aurora.scheduler.PulseMonitor.PulseMonitorImpl;
import com.twitter.aurora.scheduler.SchedulerLifecycle.DriverReference;
import com.twitter.aurora.scheduler.SchedulerLifecycle.LeadingOptions;
import com.twitter.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
import com.twitter.aurora.scheduler.events.PubsubEventModule;
import com.twitter.aurora.scheduler.periodic.GcExecutorLauncher;
-import com.twitter.aurora.scheduler.periodic.GcExecutorLauncher.GcExecutor;
+import com.twitter.aurora.scheduler.periodic.GcExecutorLauncher.GcExecutorSettings;
import com.twitter.common.args.Arg;
import com.twitter.common.args.CmdLine;
import com.twitter.common.quantity.Amount;
@@ -52,7 +51,7 @@ import com.twitter.common.quantity.Time;
public class SchedulerModule extends AbstractModule {
@CmdLine(name = "executor_gc_interval",
- help = "Interval on which to run the GC executor on a host to clean up dead tasks.")
+ help = "Max interval on which to run the GC executor on a host to clean up dead tasks.")
private static final Arg<Amount<Long, Time>> EXECUTOR_GC_INTERVAL =
Arg.create(Amount.of(1L, Time.HOURS));
@@ -81,11 +80,9 @@ public class SchedulerModule extends AbstractModule {
bind(TaskIdGenerator.class).to(TaskIdGeneratorImpl.class);
- bind(new TypeLiteral<Optional<String>>() { }).annotatedWith(GcExecutor.class)
- .toInstance(Optional.fromNullable(GC_EXECUTOR_PATH.get()));
- bind(new TypeLiteral<PulseMonitor<String>>() { })
- .annotatedWith(GcExecutor.class)
- .toInstance(new PulseMonitorImpl<String>(EXECUTOR_GC_INTERVAL.get()));
+ bind(GcExecutorSettings.class).toInstance(new GcExecutorSettings(
+ EXECUTOR_GC_INTERVAL.get(),
+ Optional.fromNullable(GC_EXECUTOR_PATH.get())));
bind(GcExecutorLauncher.class).in(Singleton.class);
bind(UserTaskLauncher.class).in(Singleton.class);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/43344575/src/main/java/com/twitter/aurora/scheduler/periodic/GcExecutorLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/periodic/GcExecutorLauncher.java b/src/main/java/com/twitter/aurora/scheduler/periodic/GcExecutorLauncher.java
index 5a8e13f..df15854 100644
--- a/src/main/java/com/twitter/aurora/scheduler/periodic/GcExecutorLauncher.java
+++ b/src/main/java/com/twitter/aurora/scheduler/periodic/GcExecutorLauncher.java
@@ -15,18 +15,19 @@
*/
package com.twitter.aurora.scheduler.periodic;
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Maps;
-import com.google.inject.BindingAnnotation;
import com.google.protobuf.ByteString;
import org.apache.mesos.Protos.ExecutorID;
@@ -41,7 +42,6 @@ import com.twitter.aurora.Protobufs;
import com.twitter.aurora.codec.ThriftBinaryCodec;
import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
import com.twitter.aurora.gen.comm.AdjustRetainedTasks;
-import com.twitter.aurora.scheduler.PulseMonitor;
import com.twitter.aurora.scheduler.TaskLauncher;
import com.twitter.aurora.scheduler.base.CommandUtil;
import com.twitter.aurora.scheduler.base.Query;
@@ -51,11 +51,10 @@ import com.twitter.aurora.scheduler.storage.Storage;
import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Data;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.Random;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -66,12 +65,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
public class GcExecutorLauncher implements TaskLauncher {
private static final Logger LOG = Logger.getLogger(GcExecutorLauncher.class.getName());
- /**
- * Binding annotation for gc executor-related fields..
- */
- @BindingAnnotation
- @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
- public @interface GcExecutor { }
+ private final AtomicLong tasksCreated = Stats.exportLong("scheduler_gc_tasks_created");
@VisibleForTesting
static final Resources TOTAL_GC_EXECUTOR_RESOURCES =
@@ -88,26 +82,31 @@ public class GcExecutorLauncher implements TaskLauncher {
private static final String SYSTEM_TASK_PREFIX = "system-gc-";
private static final String EXECUTOR_NAME = "aurora.gc";
- private final PulseMonitor<String> pulseMonitor;
- private final Optional<String> gcExecutorPath;
+ private final GcExecutorSettings settings;
private final Storage storage;
+ private final Clock clock;
+ private final Cache<String, Long> pulses;
@Inject
GcExecutorLauncher(
- @GcExecutor PulseMonitor<String> pulseMonitor,
- @GcExecutor Optional<String> gcExecutorPath,
- Storage storage) {
+ GcExecutorSettings settings,
+ Storage storage,
+ Clock clock) {
- this.pulseMonitor = checkNotNull(pulseMonitor);
- this.gcExecutorPath = checkNotNull(gcExecutorPath);
+ this.settings = checkNotNull(settings);
this.storage = checkNotNull(storage);
+ this.clock = checkNotNull(clock);
+
+ this.pulses = CacheBuilder.newBuilder()
+ .expireAfterWrite(settings.getMaxGcInterval(), TimeUnit.MILLISECONDS)
+ .build();
}
@Override
public Optional<TaskInfo> createTask(Offer offer) {
- if (!gcExecutorPath.isPresent()
+ if (!settings.getGcExecutorPath().isPresent()
|| !Resources.from(offer).greaterThanOrEqual(TOTAL_GC_EXECUTOR_RESOURCES)
- || pulseMonitor.isAlive(offer.getHostname())) {
+ || isAlive(offer.getHostname())) {
return Optional.absent();
}
@@ -123,14 +122,15 @@ public class GcExecutorLauncher implements TaskLauncher {
return Optional.absent();
}
- pulseMonitor.pulse(offer.getHostname());
+ tasksCreated.incrementAndGet();
+ pulses.put(offer.getHostname(), clock.nowMillis() + settings.getDelayMs());
ExecutorInfo.Builder executor = ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(EXECUTOR_NAME))
.setName(EXECUTOR_NAME)
.setSource(offer.getHostname())
.addAllResources(GC_EXECUTOR_RESOURCES.toResourceList())
- .setCommand(CommandUtil.create(gcExecutorPath.get()));
+ .setCommand(CommandUtil.create(settings.getGcExecutorPath().get()));
return Optional.of(TaskInfo.newBuilder().setName("system-gc")
.setTaskId(TaskID.newBuilder().setValue(SYSTEM_TASK_PREFIX + UUID.randomUUID().toString()))
@@ -155,4 +155,41 @@ public class GcExecutorLauncher implements TaskLauncher {
public void cancelOffer(OfferID offer) {
// No-op.
}
+
+ private boolean isAlive(String hostname) {
+ Optional<Long> timestamp = Optional.fromNullable(pulses.getIfPresent(hostname));
+ return timestamp.isPresent() && clock.nowMillis() < timestamp.get();
+ }
+
+ /**
+ * Wraps configuration values for the {@code GcExecutorLauncher}.
+ */
+ public static class GcExecutorSettings {
+ private final Amount<Long, Time> gcInterval;
+ private final Optional<String> gcExecutorPath;
+ private final Random rand = new Random.SystemRandom(new java.util.Random());
+
+ public GcExecutorSettings(
+ Amount<Long, Time> gcInterval,
+ Optional<String> gcExecutorPath) {
+
+ this.gcInterval = checkNotNull(gcInterval);
+ this.gcExecutorPath = checkNotNull(gcExecutorPath);
+ }
+
+ @VisibleForTesting
+ long getMaxGcInterval() {
+ return gcInterval.as(Time.MILLISECONDS);
+ }
+
+ @VisibleForTesting
+ int getDelayMs() {
+ return rand.nextInt(gcInterval.as(Time.MILLISECONDS).intValue());
+ }
+
+ @VisibleForTesting
+ Optional<String> getGcExecutorPath() {
+ return gcExecutorPath;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/43344575/src/test/java/com/twitter/aurora/scheduler/periodic/GcExecutorLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/periodic/GcExecutorLauncherTest.java b/src/test/java/com/twitter/aurora/scheduler/periodic/GcExecutorLauncherTest.java
index ac5e310..6124c7a 100644
--- a/src/test/java/com/twitter/aurora/scheduler/periodic/GcExecutorLauncherTest.java
+++ b/src/test/java/com/twitter/aurora/scheduler/periodic/GcExecutorLauncherTest.java
@@ -40,17 +40,22 @@ import com.twitter.aurora.gen.ScheduleStatus;
import com.twitter.aurora.gen.ScheduledTask;
import com.twitter.aurora.gen.TaskConfig;
import com.twitter.aurora.gen.comm.AdjustRetainedTasks;
-import com.twitter.aurora.scheduler.PulseMonitor;
import com.twitter.aurora.scheduler.base.Query;
import com.twitter.aurora.scheduler.base.Tasks;
import com.twitter.aurora.scheduler.configuration.Resources;
+import com.twitter.aurora.scheduler.periodic.GcExecutorLauncher.GcExecutorSettings;
import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
import com.twitter.aurora.scheduler.storage.testing.StorageTestUtil;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.testing.FakeClock;
import static org.easymock.EasyMock.expect;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static com.twitter.aurora.gen.ScheduleStatus.FAILED;
@@ -69,55 +74,61 @@ public class GcExecutorLauncherTest extends EasyMockTest {
private static final String JOB_A = "jobA";
+ private static final Amount<Long, Time> MAX_GC_INTERVAL = Amount.of(1L, Time.HOURS);
+ private static final Optional<String> GC_EXCECUTOR_PATH = Optional.of("nonempty");
+
private final AtomicInteger taskIdCounter = new AtomicInteger();
+ private FakeClock clock;
private StorageTestUtil storageUtil;
- private PulseMonitor<String> hostMonitor;
private GcExecutorLauncher gcExecutorLauncher;
+ private GcExecutorSettings settings;
@Before
public void setUp() {
storageUtil = new StorageTestUtil(this);
+ clock = new FakeClock();
storageUtil.expectOperations();
- hostMonitor = createMock(new Clazz<PulseMonitor<String>>() { });
- gcExecutorLauncher = new GcExecutorLauncher(
- hostMonitor,
- Optional.of("nonempty"),
- storageUtil.storage);
+ settings = createMock(GcExecutorSettings.class);
+ expect(settings.getMaxGcInterval()).andReturn(MAX_GC_INTERVAL.as(Time.MILLISECONDS)).anyTimes();
+ }
+
+ private void replayAndCreate() {
+ control.replay();
+ gcExecutorLauncher = new GcExecutorLauncher(settings, storageUtil.storage, clock);
}
@Test
- public void testPruning() throws ThriftBinaryCodec.CodingException {
+ public void testPruning() throws Exception {
IScheduledTask thermosPrunedTask = makeTask(JOB_A, true, FAILED);
IScheduledTask thermosTask = makeTask(JOB_A, true, FAILED);
IScheduledTask nonThermosTask = makeTask(JOB_A, false, FAILED);
- // Service first createTask - no hosts ready for GC.
- expect(hostMonitor.isAlive(HOST)).andReturn(true);
-
- // Service second createTask - prune no tasks.
- expect(hostMonitor.isAlive(HOST)).andReturn(false);
+ // First call - no tasks to be collected.
expectGetTasksByHost(HOST, thermosPrunedTask, thermosTask, nonThermosTask);
- hostMonitor.pulse(HOST);
+ expect(settings.getDelayMs()).andReturn(Amount.of(30, Time.MINUTES).as(Time.MILLISECONDS));
- // Service third createTask - prune one tasks.
- expect(hostMonitor.isAlive(HOST)).andReturn(false);
+ // Third call - two tasks collected.
expectGetTasksByHost(HOST, thermosPrunedTask);
- hostMonitor.pulse(HOST);
+ expect(settings.getDelayMs()).andReturn(Amount.of(30, Time.MINUTES).as(Time.MILLISECONDS));
- control.replay();
+ expect(settings.getGcExecutorPath()).andReturn(GC_EXCECUTOR_PATH).times(5);
- // First call - hostMonitor returns true, no GC.
- Optional<TaskInfo> taskInfo = gcExecutorLauncher.createTask(OFFER);
- assertFalse(taskInfo.isPresent());
+ replayAndCreate();
- // Second call - no tasks pruned.
- taskInfo = gcExecutorLauncher.createTask(OFFER);
+ // First call - no items in the cache, no tasks collected.
+ Optional<TaskInfo> taskInfo = gcExecutorLauncher.createTask(OFFER);
assertTrue(taskInfo.isPresent());
assertRetainedTasks(taskInfo.get(), thermosPrunedTask, thermosTask, nonThermosTask);
ExecutorInfo executor1 = taskInfo.get().getExecutor();
- // Third call - two tasks pruned.
+ // Second call - host item alive, no tasks collected.
+ clock.advance(Amount.of(15L, Time.MINUTES));
+ taskInfo = gcExecutorLauncher.createTask(OFFER);
+ assertFalse(taskInfo.isPresent());
+
+ // Third call - two tasks collected.
+ clock.advance(Amount.of(15L, Time.MINUTES));
taskInfo = gcExecutorLauncher.createTask(OFFER);
assertTrue(taskInfo.isPresent());
assertRetainedTasks(taskInfo.get(), thermosPrunedTask);
@@ -128,7 +139,8 @@ public class GcExecutorLauncherTest extends EasyMockTest {
@Test
public void testNoAcceptingSmallOffers() {
- control.replay();
+ expect(settings.getGcExecutorPath()).andReturn(GC_EXCECUTOR_PATH);
+ replayAndCreate();
Iterable<Resource> resources =
Resources.subtract(
@@ -146,6 +158,7 @@ public class GcExecutorLauncherTest extends EasyMockTest {
AdjustRetainedTasks message = ThriftBinaryCodec.decode(
AdjustRetainedTasks.class, taskInfo.getData().toByteArray());
Map<String, IScheduledTask> byId = Tasks.mapById(ImmutableSet.copyOf(tasks));
+ assertNotNull(message);
assertEquals(Maps.transformValues(byId, Tasks.GET_STATUS), message.getRetainedTasks());
}