You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/07/03 22:58:56 UTC

git commit: Export a stat to track number of LOST gc_executors

Repository: incubator-aurora
Updated Branches:
  refs/heads/master d30bdc895 -> 2f6211638


Export a stat to track number of LOST gc_executors

Bugs closed: AURORA-562

Reviewed at https://reviews.apache.org/r/23180/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/2f621163
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/2f621163
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/2f621163

Branch: refs/heads/master
Commit: 2f62116389a1c4a59a7da04eb92e4424cc583324
Parents: d30bdc8
Author: Joe Smith <ya...@gmail.com>
Authored: Thu Jul 3 13:48:00 2014 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Thu Jul 3 13:48:00 2014 -0700

----------------------------------------------------------------------
 .../scheduler/async/GcExecutorLauncher.java     | 29 +++++++++++++---
 .../scheduler/async/GcExecutorLauncherTest.java | 35 ++++++++++++++++----
 2 files changed, 53 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2f621163/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
index e1f6977..65f4049 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
@@ -36,6 +36,7 @@ import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.Stats;
+import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.util.Clock;
 import com.twitter.common.util.Random;
 
@@ -52,6 +53,7 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.Resources;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.ExecutorID;
 import org.apache.mesos.Protos.ExecutorInfo;
 import org.apache.mesos.Protos.Offer;
@@ -87,6 +89,8 @@ public class GcExecutorLauncher implements TaskLauncher {
 
   @VisibleForTesting
   static final String SYSTEM_TASK_PREFIX = "system-gc-";
+  @VisibleForTesting
+  static final String LOST_TASKS_STAT_NAME = "gc_executor_tasks_lost";
   private static final String EXECUTOR_NAME = "aurora.gc";
 
   private final GcExecutorSettings settings;
@@ -96,6 +100,7 @@ public class GcExecutorLauncher implements TaskLauncher {
   private final Driver driver;
   private final Supplier<String> uuidGenerator;
   private final Cache<String, Long> pulses;
+  private final AtomicLong lostTasks;
 
   @Inject
   GcExecutorLauncher(
@@ -103,7 +108,8 @@ public class GcExecutorLauncher implements TaskLauncher {
       Storage storage,
       Clock clock,
       Executor executor,
-      Driver driver) {
+      Driver driver,
+      StatsProvider statsProvider) {
 
     this(
         settings,
@@ -111,6 +117,7 @@ public class GcExecutorLauncher implements TaskLauncher {
         clock,
         executor,
         driver,
+        statsProvider,
         new Supplier<String>() {
           @Override
           public String get() {
@@ -126,6 +133,7 @@ public class GcExecutorLauncher implements TaskLauncher {
       Clock clock,
       Executor executor,
       Driver driver,
+      StatsProvider statsProvider,
       Supplier<String> uuidGenerator) {
 
     this.settings = requireNonNull(settings);
@@ -137,12 +145,15 @@ public class GcExecutorLauncher implements TaskLauncher {
     this.pulses = CacheBuilder.newBuilder()
         .expireAfterWrite(settings.getMaxGcInterval(), TimeUnit.MILLISECONDS)
         .build();
+    this.lostTasks = statsProvider.makeCounter(LOST_TASKS_STAT_NAME);
   }
 
   @VisibleForTesting
-  TaskInfo makeGcTask(
+  static TaskInfo makeGcTask(
       String sourceName,
       SlaveID slaveId,
+      String gcExecutorPath,
+      String uuid,
       AdjustRetainedTasks message) {
 
     ExecutorInfo.Builder executorInfo = ExecutorInfo.newBuilder()
@@ -150,7 +161,7 @@ public class GcExecutorLauncher implements TaskLauncher {
         .setName(EXECUTOR_NAME)
         .setSource(sourceName)
         .addAllResources(GC_EXECUTOR_TASK_RESOURCES.toResourceList())
-        .setCommand(CommandUtil.create(settings.getGcExecutorPath().get()));
+        .setCommand(CommandUtil.create(gcExecutorPath));
 
     byte[] data;
     try {
@@ -161,7 +172,7 @@ public class GcExecutorLauncher implements TaskLauncher {
     }
 
     return TaskInfo.newBuilder().setName("system-gc")
-        .setTaskId(TaskID.newBuilder().setValue(SYSTEM_TASK_PREFIX + uuidGenerator.get()))
+        .setTaskId(TaskID.newBuilder().setValue(SYSTEM_TASK_PREFIX + uuid))
         .setSlaveId(slaveId)
         .setData(ByteString.copyFrom(data))
         .setExecutor(executorInfo)
@@ -177,7 +188,12 @@ public class GcExecutorLauncher implements TaskLauncher {
         Maps.transformValues(Tasks.mapById(tasksOnHost), Tasks.GET_STATUS),
         Predicates.not(Predicates.equalTo(ScheduleStatus.SANDBOX_DELETED)));
     tasksCreated.incrementAndGet();
-    return makeGcTask(hostName, slaveId, new AdjustRetainedTasks().setRetainedTasks(tasks));
+    return makeGcTask(
+        hostName,
+        slaveId,
+        settings.getGcExecutorPath().get(),
+        uuidGenerator.get(),
+        new AdjustRetainedTasks().setRetainedTasks(tasks));
   }
 
   private boolean sufficientResources(Offer offer) {
@@ -212,6 +228,9 @@ public class GcExecutorLauncher implements TaskLauncher {
   public boolean statusUpdate(TaskStatus status) {
     if (status.getTaskId().getValue().startsWith(SYSTEM_TASK_PREFIX)) {
       LOG.info("Received status update for GC task: " + Protobufs.toString(status));
+      if (status.getState() == Protos.TaskState.TASK_LOST) {
+        lostTasks.incrementAndGet();
+      }
       return true;
     } else {
       return false;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/2f621163/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
index c314a2b..0413707 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
@@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.async;
 
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Suppliers;
@@ -23,6 +24,7 @@ import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.testing.FakeClock;
 
@@ -54,7 +56,10 @@ import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.FAILED;
 import static org.apache.aurora.gen.ScheduleStatus.SANDBOX_DELETED;
+import static org.apache.aurora.scheduler.async.GcExecutorLauncher.LOST_TASKS_STAT_NAME;
 import static org.apache.aurora.scheduler.async.GcExecutorLauncher.SYSTEM_TASK_PREFIX;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -71,6 +76,7 @@ public class GcExecutorLauncherTest extends EasyMockTest {
       .build();
 
   private static final String JOB_A = "jobA";
+  private static final String TASK_UUID = "gc";
 
   private static final GcExecutorSettings SETTINGS =
       new GcExecutorSettings(Amount.of(1L, Time.HOURS), Optional.of("nonempty"));
@@ -80,7 +86,9 @@ public class GcExecutorLauncherTest extends EasyMockTest {
   private FakeClock clock;
   private StorageTestUtil storageUtil;
   private Driver driver;
+  private StatsProvider statsProvider;
   private GcExecutorLauncher gcExecutorLauncher;
+  private AtomicLong lostTasks;
 
   @Before
   public void setUp() {
@@ -88,13 +96,21 @@ public class GcExecutorLauncherTest extends EasyMockTest {
     clock = new FakeClock();
     storageUtil.expectOperations();
     driver = createMock(Driver.class);
+    statsProvider = createMock(StatsProvider.class);
+    lostTasks = new AtomicLong();
+  }
+
+  private void replayAndConstruct() {
+    expect(statsProvider.makeCounter(LOST_TASKS_STAT_NAME)).andReturn(lostTasks);
+    control.replay();
     gcExecutorLauncher = new GcExecutorLauncher(
         SETTINGS,
         storageUtil.storage,
         clock,
         MoreExecutors.sameThreadExecutor(),
         driver,
-        Suppliers.ofInstance("gc"));
+        statsProvider,
+        Suppliers.ofInstance(TASK_UUID));
   }
 
   @Test
@@ -111,7 +127,7 @@ public class GcExecutorLauncherTest extends EasyMockTest {
     expectGetTasksByHost(HOST, a);
     expectAdjustRetainedTasks(a);
 
-    control.replay();
+    replayAndConstruct();
 
     // First call - no items in the cache, no tasks collected.
     assertTrue(gcExecutorLauncher.willUse(OFFER));
@@ -127,7 +143,7 @@ public class GcExecutorLauncherTest extends EasyMockTest {
 
   @Test
   public void testNoAcceptingSmallOffers() {
-    control.replay();
+    replayAndConstruct();
 
     Iterable<Resource> resources =
         Resources.subtract(
@@ -150,16 +166,21 @@ public class GcExecutorLauncherTest extends EasyMockTest {
 
   @Test
   public void testStatusUpdate() {
-    control.replay();
+    replayAndConstruct();
 
     assertTrue(gcExecutorLauncher.statusUpdate(makeStatus(SYSTEM_TASK_PREFIX)));
     assertTrue(gcExecutorLauncher.statusUpdate(makeStatus(SYSTEM_TASK_PREFIX + "1")));
     assertFalse(gcExecutorLauncher.statusUpdate(makeStatus("1" + SYSTEM_TASK_PREFIX)));
     assertFalse(gcExecutorLauncher.statusUpdate(makeStatus("asdf")));
+    assertEquals(0, lostTasks.get());
+    assertTrue(gcExecutorLauncher.statusUpdate(
+        makeStatus(SYSTEM_TASK_PREFIX).toBuilder().setState(TaskState.TASK_LOST).build()));
+    assertEquals(1, lostTasks.get());
   }
 
   @Test
   public void testGcExecutorDisabled() {
+    expect(statsProvider.makeCounter(LOST_TASKS_STAT_NAME)).andReturn(lostTasks);
     control.replay();
 
     gcExecutorLauncher = new GcExecutorLauncher(
@@ -168,6 +189,7 @@ public class GcExecutorLauncherTest extends EasyMockTest {
         clock,
         MoreExecutors.sameThreadExecutor(),
         driver,
+        statsProvider,
         Suppliers.ofInstance("gc"));
     assertFalse(gcExecutorLauncher.willUse(OFFER));
   }
@@ -180,7 +202,7 @@ public class GcExecutorLauncherTest extends EasyMockTest {
     expectGetTasksByHost(HOST, a, b);
     expectAdjustRetainedTasks(a);
 
-    control.replay();
+    replayAndConstruct();
 
     // First call - no items in the cache, no tasks collected.
     assertTrue(gcExecutorLauncher.willUse(OFFER));
@@ -190,7 +212,8 @@ public class GcExecutorLauncherTest extends EasyMockTest {
     Map<String, ScheduleStatus> statuses =
         Maps.transformValues(Tasks.mapById(ImmutableSet.copyOf(tasks)), Tasks.GET_STATUS);
     AdjustRetainedTasks message = new AdjustRetainedTasks().setRetainedTasks(statuses);
-    TaskInfo task = gcExecutorLauncher.makeGcTask(HOST, OFFER.getSlaveId(), message);
+    TaskInfo task = GcExecutorLauncher.makeGcTask(
+        HOST, OFFER.getSlaveId(), SETTINGS.getGcExecutorPath().get(), TASK_UUID, message);
     driver.launchTask(OFFER.getId(), task);
   }