You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/22 18:55:15 UTC

git commit: Only export counters in TaskVars after SchedulerActive event.

Updated Branches:
  refs/heads/master d7a82dc42 -> 479791381


Only export counters in TaskVars after SchedulerActive event.

Bugs closed: AURORA-59

Reviewed at https://reviews.apache.org/r/17095/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/47979138
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/47979138
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/47979138

Branch: refs/heads/master
Commit: 47979138100b6049acd7d6b2e4aba8858bd44040
Parents: d7a82dc
Author: Bill Farner <wf...@apache.org>
Authored: Wed Jan 22 09:48:57 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Jan 22 09:48:57 2014 -0800

----------------------------------------------------------------------
 .../org/apache/aurora/scheduler/TaskVars.java   | 72 ++++++++++++++-----
 .../apache/aurora/scheduler/TaskVarsTest.java   | 73 +++++++++++++-------
 2 files changed, 101 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/47979138/src/main/java/org/apache/aurora/scheduler/TaskVars.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskVars.java b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
index d5d752e..38e112a 100644
--- a/src/main/java/org/apache/aurora/scheduler/TaskVars.java
+++ b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
@@ -15,6 +15,7 @@
  */
 package org.apache.aurora.scheduler;
 
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Logger;
 
@@ -24,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -52,23 +54,21 @@ import static com.google.common.base.Preconditions.checkNotNull;
 class TaskVars implements EventSubscriber {
   private static final Logger LOG = Logger.getLogger(TaskVars.class.getName());
 
-  private final LoadingCache<String, AtomicLong> countersByStatus;
-  private final LoadingCache<String, AtomicLong> countersByRack;
-
+  private final LoadingCache<String, Counter> counters;
   private final Storage storage;
+  private volatile boolean exporting = false;
 
   @Inject
   TaskVars(Storage storage, final StatsProvider statProvider) {
     this.storage = checkNotNull(storage);
     checkNotNull(statProvider);
-    countersByStatus = CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() {
-      @Override public AtomicLong load(String statName) {
-        return statProvider.makeCounter(statName);
-      }
-    });
-    countersByRack = CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() {
-      @Override public AtomicLong load(String rack) {
-        return statProvider.makeCounter(rackStatName(rack));
+    counters = CacheBuilder.newBuilder().build(new CacheLoader<String, Counter>() {
+      @Override public Counter load(String statName) {
+        Counter counter = new Counter(statProvider);
+        if (exporting) {
+          counter.exportAs(statName);
+        }
+        return counter;
       }
     });
   }
@@ -95,16 +95,16 @@ class TaskVars implements EventSubscriber {
     }
   };
 
-  private AtomicLong getCounter(ScheduleStatus status) {
-    return countersByStatus.getUnchecked(getVarName(status));
+  private Counter getCounter(ScheduleStatus status) {
+    return counters.getUnchecked(getVarName(status));
   }
 
   private void incrementCount(ScheduleStatus status) {
-    getCounter(status).incrementAndGet();
+    getCounter(status).increment();
   }
 
   private void decrementCount(ScheduleStatus status) {
-    getCounter(status).decrementAndGet();
+    getCounter(status).decrement();
   }
 
   @Subscribe
@@ -129,7 +129,7 @@ class TaskVars implements EventSubscriber {
       });
 
       if (rack.isPresent()) {
-        countersByRack.getUnchecked(rack.get()).incrementAndGet();
+        counters.getUnchecked(rackStatName(rack.get())).increment();
       } else {
         LOG.warning("Failed to find rack attribute associated with host " + host);
       }
@@ -138,15 +138,19 @@ class TaskVars implements EventSubscriber {
 
   @Subscribe
   public void schedulerActive(SchedulerActive event) {
-    // TODO(wfarner): This should probably induce the initial 'export' of stats, so that incomplete
-    // values are not surfaced while storage is recovering.
-
     // Dummy read the counter for each status counter. This is important to guarantee a stat with
     // value zero is present for each state, even if all states are not represented in the task
     // store.
     for (ScheduleStatus status : ScheduleStatus.values()) {
       getCounter(status);
     }
+
+    // Initiate export of all counters.  This is not done initially to avoid exporting values that
+    // do not represent the entire storage contents.
+    exporting = true;
+    for (Map.Entry<String, Counter> entry : counters.asMap().entrySet()) {
+      entry.getValue().exportAs(entry.getKey());
+    }
   }
 
   @Subscribe
@@ -155,4 +159,34 @@ class TaskVars implements EventSubscriber {
       decrementCount(task.getStatus());
     }
   }
+
+  private static class Counter implements Supplier<Long> {
+    private final AtomicLong value = new AtomicLong();
+    private boolean exported = false;
+    private final StatsProvider stats;
+
+    Counter(StatsProvider stats) {
+      this.stats = stats;
+    }
+
+    @Override
+    public Long get() {
+      return value.get();
+    }
+
+    private synchronized void exportAs(String name) {
+      if (!exported) {
+        stats.makeGauge(name, this);
+        exported = true;
+      }
+    }
+
+    private void increment() {
+      value.incrementAndGet();
+    }
+
+    private void decrement() {
+      value.decrementAndGet();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/47979138/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
index dde053c..d6fc72e 100644
--- a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
@@ -16,11 +16,12 @@
 package org.apache.aurora.scheduler;
 
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
+import com.twitter.common.stats.Stat;
 import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
@@ -36,6 +37,8 @@ import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
 import org.easymock.IExpectationSetters;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,6 +52,7 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 public class TaskVarsTest extends EasyMockTest {
 
@@ -60,7 +64,7 @@ public class TaskVarsTest extends EasyMockTest {
   private StorageTestUtil storageUtil;
   private StatsProvider trackedStats;
   private TaskVars vars;
-  private Map<ScheduleStatus, AtomicLong> globalCounters;
+  private Map<String, Supplier<Long>> globalCounters;
 
   @Before
   public void setUp() {
@@ -72,11 +76,21 @@ public class TaskVarsTest extends EasyMockTest {
     globalCounters = Maps.newHashMap();
   }
 
+  private void expectStatExport(final String name) {
+    expect(trackedStats.makeGauge(EasyMock.eq(name), EasyMock.<Supplier<Long>>anyObject()))
+        .andAnswer(new IAnswer<Stat<Long>>() {
+          @SuppressWarnings("unchecked")
+          @Override public Stat<Long> answer() {
+            assertFalse(globalCounters.containsKey(name));
+            globalCounters.put(name, (Supplier<Long>) EasyMock.getCurrentArguments()[1]);
+            return null;
+          }
+        });
+  }
+
   private void expectStatusCountersInitialized() {
     for (ScheduleStatus status : ScheduleStatus.values()) {
-      AtomicLong counter = new AtomicLong(0);
-      globalCounters.put(status, counter);
-      expect(trackedStats.makeCounter(TaskVars.getVarName(status))).andReturn(counter);
+      expectStatExport(TaskVars.getVarName(status));
     }
   }
 
@@ -109,8 +123,8 @@ public class TaskVarsTest extends EasyMockTest {
   }
 
   private void assertAllZero() {
-    for (AtomicLong counter : globalCounters.values()) {
-      assertEquals(0L, counter.get());
+    for (Supplier<Long> counter : globalCounters.values()) {
+      assertEquals(0L, counter.get().longValue());
     }
   }
 
@@ -128,8 +142,19 @@ public class TaskVarsTest extends EasyMockTest {
   public void testNoEarlyExport() {
     control.replay();
 
-    // No variables should be exported prior to storage starting.
+    // No variables should be exported since schedulerActive is never called.
     vars = new TaskVars(storageUtil.storage, trackedStats);
+    IScheduledTask taskA = makeTask(JOB_A, INIT);
+    changeState(taskA, PENDING);
+    changeState(IScheduledTask.build(taskA.newBuilder().setStatus(PENDING)), ASSIGNED);
+  }
+
+  private int getValue(String name) {
+    return globalCounters.get(name).get().intValue();
+  }
+
+  private int getValue(ScheduleStatus status) {
+    return getValue(TaskVars.getVarName(status));
   }
 
   @Test
@@ -141,16 +166,16 @@ public class TaskVarsTest extends EasyMockTest {
 
     IScheduledTask taskA = makeTask(JOB_A, INIT);
     changeState(taskA, PENDING);
-    assertEquals(1, globalCounters.get(PENDING).get());
+    assertEquals(1, getValue(PENDING));
     changeState(IScheduledTask.build(taskA.newBuilder().setStatus(PENDING)), ASSIGNED);
-    assertEquals(0, globalCounters.get(PENDING).get());
-    assertEquals(1, globalCounters.get(ASSIGNED).get());
+    assertEquals(0, getValue(PENDING));
+    assertEquals(1, getValue(ASSIGNED));
     changeState(IScheduledTask.build(taskA.newBuilder().setStatus(ASSIGNED)), RUNNING);
-    assertEquals(0, globalCounters.get(ASSIGNED).get());
-    assertEquals(1, globalCounters.get(RUNNING).get());
+    assertEquals(0, getValue(ASSIGNED));
+    assertEquals(1, getValue(RUNNING));
     changeState(IScheduledTask.build(taskA.newBuilder().setStatus(RUNNING)), FINISHED);
-    assertEquals(0, globalCounters.get(RUNNING).get());
-    assertEquals(1, globalCounters.get(FINISHED).get());
+    assertEquals(0, getValue(RUNNING));
+    assertEquals(1, getValue(FINISHED));
     vars.tasksDeleted(new TasksDeleted(ImmutableSet.of(
         IScheduledTask.build(taskA.newBuilder().setStatus(FINISHED)))));
     assertAllZero();
@@ -168,10 +193,10 @@ public class TaskVarsTest extends EasyMockTest {
         makeTask(JOB_B, PENDING),
         makeTask(JOB_B, FAILED));
 
-    assertEquals(2, globalCounters.get(PENDING).get());
-    assertEquals(1, globalCounters.get(RUNNING).get());
-    assertEquals(1, globalCounters.get(FINISHED).get());
-    assertEquals(1, globalCounters.get(FAILED).get());
+    assertEquals(2, getValue(PENDING));
+    assertEquals(1, getValue(RUNNING));
+    assertEquals(1, getValue(FINISHED));
+    assertEquals(1, getValue(FAILED));
   }
 
   private IExpectationSetters<?> expectGetHostRack(String host, String rackToReturn) {
@@ -190,10 +215,8 @@ public class TaskVarsTest extends EasyMockTest {
     expectGetHostRack("host2", "rackB").atLeastOnce();
     expectGetHostRack("host3", "rackB").atLeastOnce();
 
-    AtomicLong rackA = new AtomicLong();
-    expect(trackedStats.makeCounter(TaskVars.rackStatName("rackA"))).andReturn(rackA);
-    AtomicLong rackB = new AtomicLong();
-    expect(trackedStats.makeCounter(TaskVars.rackStatName("rackB"))).andReturn(rackB);
+    expectStatExport(TaskVars.rackStatName("rackA"));
+    expectStatExport(TaskVars.rackStatName("rackB"));
 
     control.replay();
     schedulerActivated();
@@ -208,8 +231,8 @@ public class TaskVarsTest extends EasyMockTest {
     changeState(c, LOST);
     changeState(d, LOST);
 
-    assertEquals(2, rackA.get());
-    assertEquals(2, rackB.get());
+    assertEquals(2, getValue(TaskVars.rackStatName("rackA")));
+    assertEquals(2, getValue(TaskVars.rackStatName("rackB")));
   }
 
   @Test