You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2013/12/19 02:59:42 UTC

git commit: Adding random jitter for the initial GC task scheduling to better spread GC execution over time.

Updated Branches:
  refs/heads/master 4f1c62188 -> 43344575d


Adding random jitter for the initial GC task scheduling to better spread GC execution over time.

Reviewed at https://reviews.apache.org/r/16247/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/43344575
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/43344575
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/43344575

Branch: refs/heads/master
Commit: 43344575d0e4995cb4a2d4e4b00fd117ddb2567c
Parents: 4f1c621
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Wed Dec 18 17:58:47 2013 -0800
Committer: Maxim Khutornenko <mk...@twitter.com>
Committed: Wed Dec 18 17:58:47 2013 -0800

----------------------------------------------------------------------
 .../twitter/aurora/scheduler/PulseMonitor.java  | 95 --------------------
 .../aurora/scheduler/SchedulerModule.java       | 13 ++-
 .../scheduler/periodic/GcExecutorLauncher.java  | 89 ++++++++++++------
 .../periodic/GcExecutorLauncherTest.java        | 63 +++++++------
 4 files changed, 106 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/43344575/src/main/java/com/twitter/aurora/scheduler/PulseMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/PulseMonitor.java b/src/main/java/com/twitter/aurora/scheduler/PulseMonitor.java
deleted file mode 100644
index e31ce3d..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/PulseMonitor.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Supplier;
-import com.google.common.cache.CacheBuilder;
-
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-
-/**
- * A pulse monitor to identify when a pulse has not been received for an item beyond a defined
- * threshold.
- * Also acts as a supplier of the pulsed type, which will provide access to all non-expired
- * entries.
- *
- * @param <T> The type of values to track.
- */
-public interface PulseMonitor<T> extends Supplier<Set<T>> {
-
-  /**
-   * Receive a pulse for an entry, effectively marking it as alive.
-   *
-   * @param t Item to update.
-   */
-  void pulse(T t);
-
-  /**
-   * Checks if an entry is considered alive, based on the expiration time.  Note that if the
-   * monitor is created and this method is called before {@link #pulse(Object)},
-   * this method will always return {@code false}.
-   *
-   * @param t Item to check the pulse of.
-   * @return {@code true} if a pulse has been received for the entry, and the time between now and
-   *    the last pulse is less than the expiration period, {@code false} otherwise.
-   */
-  boolean isAlive(T t);
-
-  /**
-   * Pulse monitor implementation using a decaying map for time expiration.
-   *
-   * @param <T> The type of values to track.
-   */
-  public static class PulseMonitorImpl<T> implements PulseMonitor<T> {
-
-    private final Map<T, T> pulses;
-
-    /**
-     * Creates a new pulse monitor that will consider an entry dead if the time since the last pulse
-     * for the entry is greater than {@code expiration}.
-     *
-     * @param expiration Time after which an entry is considered dead.
-     */
-    public PulseMonitorImpl(Amount<Long, Time> expiration) {
-      // TODO(William Farner): Consider using timestamps instead and allowing exposure of live
-      // entries and the time since their last pulse.
-      pulses = CacheBuilder.newBuilder()
-          .expireAfterWrite(expiration.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)
-          .<T, T>build()
-          .asMap();
-    }
-
-    @Override
-    public void pulse(T t) {
-      pulses.put(t, t);
-    }
-
-    @Override
-    public boolean isAlive(T t) {
-      return pulses.containsKey(t);
-    }
-
-    @Override
-    public Set<T> get() {
-      return pulses.keySet();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/43344575/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java b/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
index 44aeadc..be4c2b1 100644
--- a/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
+++ b/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
@@ -34,13 +34,12 @@ import org.apache.mesos.Scheduler;
 import org.apache.mesos.SchedulerDriver;
 
 import com.twitter.aurora.scheduler.Driver.DriverImpl;
-import com.twitter.aurora.scheduler.PulseMonitor.PulseMonitorImpl;
 import com.twitter.aurora.scheduler.SchedulerLifecycle.DriverReference;
 import com.twitter.aurora.scheduler.SchedulerLifecycle.LeadingOptions;
 import com.twitter.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
 import com.twitter.aurora.scheduler.events.PubsubEventModule;
 import com.twitter.aurora.scheduler.periodic.GcExecutorLauncher;
-import com.twitter.aurora.scheduler.periodic.GcExecutorLauncher.GcExecutor;
+import com.twitter.aurora.scheduler.periodic.GcExecutorLauncher.GcExecutorSettings;
 import com.twitter.common.args.Arg;
 import com.twitter.common.args.CmdLine;
 import com.twitter.common.quantity.Amount;
@@ -52,7 +51,7 @@ import com.twitter.common.quantity.Time;
 public class SchedulerModule extends AbstractModule {
 
   @CmdLine(name = "executor_gc_interval",
-      help = "Interval on which to run the GC executor on a host to clean up dead tasks.")
+      help = "Max interval on which to run the GC executor on a host to clean up dead tasks.")
   private static final Arg<Amount<Long, Time>> EXECUTOR_GC_INTERVAL =
       Arg.create(Amount.of(1L, Time.HOURS));
 
@@ -81,11 +80,9 @@ public class SchedulerModule extends AbstractModule {
 
     bind(TaskIdGenerator.class).to(TaskIdGeneratorImpl.class);
 
-    bind(new TypeLiteral<Optional<String>>() { }).annotatedWith(GcExecutor.class)
-        .toInstance(Optional.fromNullable(GC_EXECUTOR_PATH.get()));
-    bind(new TypeLiteral<PulseMonitor<String>>() { })
-        .annotatedWith(GcExecutor.class)
-        .toInstance(new PulseMonitorImpl<String>(EXECUTOR_GC_INTERVAL.get()));
+    bind(GcExecutorSettings.class).toInstance(new GcExecutorSettings(
+        EXECUTOR_GC_INTERVAL.get(),
+        Optional.fromNullable(GC_EXECUTOR_PATH.get())));
 
     bind(GcExecutorLauncher.class).in(Singleton.class);
     bind(UserTaskLauncher.class).in(Singleton.class);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/43344575/src/main/java/com/twitter/aurora/scheduler/periodic/GcExecutorLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/periodic/GcExecutorLauncher.java b/src/main/java/com/twitter/aurora/scheduler/periodic/GcExecutorLauncher.java
index 5a8e13f..df15854 100644
--- a/src/main/java/com/twitter/aurora/scheduler/periodic/GcExecutorLauncher.java
+++ b/src/main/java/com/twitter/aurora/scheduler/periodic/GcExecutorLauncher.java
@@ -15,18 +15,19 @@
  */
 package com.twitter.aurora.scheduler.periodic;
 
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Maps;
-import com.google.inject.BindingAnnotation;
 import com.google.protobuf.ByteString;
 
 import org.apache.mesos.Protos.ExecutorID;
@@ -41,7 +42,6 @@ import com.twitter.aurora.Protobufs;
 import com.twitter.aurora.codec.ThriftBinaryCodec;
 import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
 import com.twitter.aurora.gen.comm.AdjustRetainedTasks;
-import com.twitter.aurora.scheduler.PulseMonitor;
 import com.twitter.aurora.scheduler.TaskLauncher;
 import com.twitter.aurora.scheduler.base.CommandUtil;
 import com.twitter.aurora.scheduler.base.Query;
@@ -51,11 +51,10 @@ import com.twitter.aurora.scheduler.storage.Storage;
 import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.Random;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -66,12 +65,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 public class GcExecutorLauncher implements TaskLauncher {
   private static final Logger LOG = Logger.getLogger(GcExecutorLauncher.class.getName());
 
-  /**
-   * Binding annotation for gc executor-related fields..
-   */
-  @BindingAnnotation
-  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  public @interface GcExecutor { }
+  private final AtomicLong tasksCreated = Stats.exportLong("scheduler_gc_tasks_created");
 
   @VisibleForTesting
   static final Resources TOTAL_GC_EXECUTOR_RESOURCES =
@@ -88,26 +82,31 @@ public class GcExecutorLauncher implements TaskLauncher {
   private static final String SYSTEM_TASK_PREFIX = "system-gc-";
   private static final String EXECUTOR_NAME = "aurora.gc";
 
-  private final PulseMonitor<String> pulseMonitor;
-  private final Optional<String> gcExecutorPath;
+  private final GcExecutorSettings settings;
   private final Storage storage;
+  private final Clock clock;
+  private final Cache<String, Long> pulses;
 
   @Inject
   GcExecutorLauncher(
-      @GcExecutor PulseMonitor<String> pulseMonitor,
-      @GcExecutor Optional<String> gcExecutorPath,
-      Storage storage) {
+      GcExecutorSettings settings,
+      Storage storage,
+      Clock clock) {
 
-    this.pulseMonitor = checkNotNull(pulseMonitor);
-    this.gcExecutorPath = checkNotNull(gcExecutorPath);
+    this.settings = checkNotNull(settings);
     this.storage = checkNotNull(storage);
+    this.clock = checkNotNull(clock);
+
+    this.pulses = CacheBuilder.newBuilder()
+        .expireAfterWrite(settings.getMaxGcInterval(), TimeUnit.MILLISECONDS)
+        .build();
   }
 
   @Override
   public Optional<TaskInfo> createTask(Offer offer) {
-    if (!gcExecutorPath.isPresent()
+    if (!settings.getGcExecutorPath().isPresent()
         || !Resources.from(offer).greaterThanOrEqual(TOTAL_GC_EXECUTOR_RESOURCES)
-        || pulseMonitor.isAlive(offer.getHostname())) {
+        || isAlive(offer.getHostname())) {
       return Optional.absent();
     }
 
@@ -123,14 +122,15 @@ public class GcExecutorLauncher implements TaskLauncher {
       return Optional.absent();
     }
 
-    pulseMonitor.pulse(offer.getHostname());
+    tasksCreated.incrementAndGet();
+    pulses.put(offer.getHostname(), clock.nowMillis() + settings.getDelayMs());
 
     ExecutorInfo.Builder executor = ExecutorInfo.newBuilder()
         .setExecutorId(ExecutorID.newBuilder().setValue(EXECUTOR_NAME))
         .setName(EXECUTOR_NAME)
         .setSource(offer.getHostname())
         .addAllResources(GC_EXECUTOR_RESOURCES.toResourceList())
-        .setCommand(CommandUtil.create(gcExecutorPath.get()));
+        .setCommand(CommandUtil.create(settings.getGcExecutorPath().get()));
 
     return Optional.of(TaskInfo.newBuilder().setName("system-gc")
         .setTaskId(TaskID.newBuilder().setValue(SYSTEM_TASK_PREFIX + UUID.randomUUID().toString()))
@@ -155,4 +155,41 @@ public class GcExecutorLauncher implements TaskLauncher {
   public void cancelOffer(OfferID offer) {
     // No-op.
   }
+
+  private boolean isAlive(String hostname) {
+    Optional<Long> timestamp = Optional.fromNullable(pulses.getIfPresent(hostname));
+    return timestamp.isPresent() && clock.nowMillis() < timestamp.get();
+  }
+
+  /**
+   * Wraps configuration values for the {@code GcExecutorLauncher}.
+   */
+  public static class GcExecutorSettings {
+    private final Amount<Long, Time> gcInterval;
+    private final Optional<String> gcExecutorPath;
+    private final Random rand = new Random.SystemRandom(new java.util.Random());
+
+    public GcExecutorSettings(
+        Amount<Long, Time> gcInterval,
+        Optional<String> gcExecutorPath) {
+
+      this.gcInterval = checkNotNull(gcInterval);
+      this.gcExecutorPath = checkNotNull(gcExecutorPath);
+    }
+
+    @VisibleForTesting
+    long getMaxGcInterval() {
+      return gcInterval.as(Time.MILLISECONDS);
+    }
+
+    @VisibleForTesting
+    int getDelayMs() {
+      return rand.nextInt(gcInterval.as(Time.MILLISECONDS).intValue());
+    }
+
+    @VisibleForTesting
+    Optional<String> getGcExecutorPath() {
+      return gcExecutorPath;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/43344575/src/test/java/com/twitter/aurora/scheduler/periodic/GcExecutorLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/twitter/aurora/scheduler/periodic/GcExecutorLauncherTest.java b/src/test/java/com/twitter/aurora/scheduler/periodic/GcExecutorLauncherTest.java
index ac5e310..6124c7a 100644
--- a/src/test/java/com/twitter/aurora/scheduler/periodic/GcExecutorLauncherTest.java
+++ b/src/test/java/com/twitter/aurora/scheduler/periodic/GcExecutorLauncherTest.java
@@ -40,17 +40,22 @@ import com.twitter.aurora.gen.ScheduleStatus;
 import com.twitter.aurora.gen.ScheduledTask;
 import com.twitter.aurora.gen.TaskConfig;
 import com.twitter.aurora.gen.comm.AdjustRetainedTasks;
-import com.twitter.aurora.scheduler.PulseMonitor;
 import com.twitter.aurora.scheduler.base.Query;
 import com.twitter.aurora.scheduler.base.Tasks;
 import com.twitter.aurora.scheduler.configuration.Resources;
+import com.twitter.aurora.scheduler.periodic.GcExecutorLauncher.GcExecutorSettings;
 import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
 import com.twitter.aurora.scheduler.storage.testing.StorageTestUtil;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
 import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.testing.FakeClock;
 
 import static org.easymock.EasyMock.expect;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import static com.twitter.aurora.gen.ScheduleStatus.FAILED;
@@ -69,55 +74,61 @@ public class GcExecutorLauncherTest extends EasyMockTest {
 
   private static final String JOB_A = "jobA";
 
+  private static final Amount<Long, Time> MAX_GC_INTERVAL = Amount.of(1L, Time.HOURS);
+  private static final Optional<String> GC_EXCECUTOR_PATH = Optional.of("nonempty");
+
   private final AtomicInteger taskIdCounter = new AtomicInteger();
 
+  private FakeClock clock;
   private StorageTestUtil storageUtil;
-  private PulseMonitor<String> hostMonitor;
   private GcExecutorLauncher gcExecutorLauncher;
+  private GcExecutorSettings settings;
 
   @Before
   public void setUp() {
     storageUtil = new StorageTestUtil(this);
+    clock = new FakeClock();
     storageUtil.expectOperations();
-    hostMonitor = createMock(new Clazz<PulseMonitor<String>>() { });
-    gcExecutorLauncher = new GcExecutorLauncher(
-        hostMonitor,
-        Optional.of("nonempty"),
-        storageUtil.storage);
+    settings = createMock(GcExecutorSettings.class);
+    expect(settings.getMaxGcInterval()).andReturn(MAX_GC_INTERVAL.as(Time.MILLISECONDS)).anyTimes();
+  }
+
+  private void replayAndCreate() {
+    control.replay();
+    gcExecutorLauncher = new GcExecutorLauncher(settings, storageUtil.storage, clock);
   }
 
   @Test
-  public void testPruning() throws ThriftBinaryCodec.CodingException {
+  public void testPruning() throws Exception {
     IScheduledTask thermosPrunedTask = makeTask(JOB_A, true, FAILED);
     IScheduledTask thermosTask = makeTask(JOB_A, true, FAILED);
     IScheduledTask nonThermosTask = makeTask(JOB_A, false, FAILED);
 
-    // Service first createTask - no hosts ready for GC.
-    expect(hostMonitor.isAlive(HOST)).andReturn(true);
-
-    // Service second createTask - prune no tasks.
-    expect(hostMonitor.isAlive(HOST)).andReturn(false);
+    // First call - no tasks to be collected.
     expectGetTasksByHost(HOST, thermosPrunedTask, thermosTask, nonThermosTask);
-    hostMonitor.pulse(HOST);
+    expect(settings.getDelayMs()).andReturn(Amount.of(30, Time.MINUTES).as(Time.MILLISECONDS));
 
-    // Service third createTask - prune one tasks.
-    expect(hostMonitor.isAlive(HOST)).andReturn(false);
+    // Third call - two tasks collected.
     expectGetTasksByHost(HOST, thermosPrunedTask);
-    hostMonitor.pulse(HOST);
+    expect(settings.getDelayMs()).andReturn(Amount.of(30, Time.MINUTES).as(Time.MILLISECONDS));
 
-    control.replay();
+    expect(settings.getGcExecutorPath()).andReturn(GC_EXCECUTOR_PATH).times(5);
 
-    // First call - hostMonitor returns true, no GC.
-    Optional<TaskInfo> taskInfo = gcExecutorLauncher.createTask(OFFER);
-    assertFalse(taskInfo.isPresent());
+    replayAndCreate();
 
-    // Second call - no tasks pruned.
-    taskInfo = gcExecutorLauncher.createTask(OFFER);
+    // First call - no items in the cache, no tasks collected.
+    Optional<TaskInfo> taskInfo = gcExecutorLauncher.createTask(OFFER);
     assertTrue(taskInfo.isPresent());
     assertRetainedTasks(taskInfo.get(), thermosPrunedTask, thermosTask, nonThermosTask);
     ExecutorInfo executor1 = taskInfo.get().getExecutor();
 
-    // Third call - two tasks pruned.
+    // Second call - host item alive, no tasks collected.
+    clock.advance(Amount.of(15L, Time.MINUTES));
+    taskInfo = gcExecutorLauncher.createTask(OFFER);
+    assertFalse(taskInfo.isPresent());
+
+    // Third call - two tasks collected.
+    clock.advance(Amount.of(15L, Time.MINUTES));
     taskInfo = gcExecutorLauncher.createTask(OFFER);
     assertTrue(taskInfo.isPresent());
     assertRetainedTasks(taskInfo.get(), thermosPrunedTask);
@@ -128,7 +139,8 @@ public class GcExecutorLauncherTest extends EasyMockTest {
 
   @Test
   public void testNoAcceptingSmallOffers() {
-    control.replay();
+    expect(settings.getGcExecutorPath()).andReturn(GC_EXCECUTOR_PATH);
+    replayAndCreate();
 
     Iterable<Resource> resources =
         Resources.subtract(
@@ -146,6 +158,7 @@ public class GcExecutorLauncherTest extends EasyMockTest {
     AdjustRetainedTasks message = ThriftBinaryCodec.decode(
         AdjustRetainedTasks.class, taskInfo.getData().toByteArray());
     Map<String, IScheduledTask> byId = Tasks.mapById(ImmutableSet.copyOf(tasks));
+    assertNotNull(message);
     assertEquals(Maps.transformValues(byId, Tasks.GET_STATUS), message.getRetainedTasks());
   }