You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/22 18:55:15 UTC
git commit: Only export counters in TaskVars after SchedulerActive
event.
Updated Branches:
refs/heads/master d7a82dc42 -> 479791381
Only export counters in TaskVars after SchedulerActive event.
Bugs closed: AURORA-59
Reviewed at https://reviews.apache.org/r/17095/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/47979138
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/47979138
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/47979138
Branch: refs/heads/master
Commit: 47979138100b6049acd7d6b2e4aba8858bd44040
Parents: d7a82dc
Author: Bill Farner <wf...@apache.org>
Authored: Wed Jan 22 09:48:57 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Jan 22 09:48:57 2014 -0800
----------------------------------------------------------------------
.../org/apache/aurora/scheduler/TaskVars.java | 72 ++++++++++++++-----
.../apache/aurora/scheduler/TaskVarsTest.java | 73 +++++++++++++-------
2 files changed, 101 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/47979138/src/main/java/org/apache/aurora/scheduler/TaskVars.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskVars.java b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
index d5d752e..38e112a 100644
--- a/src/main/java/org/apache/aurora/scheduler/TaskVars.java
+++ b/src/main/java/org/apache/aurora/scheduler/TaskVars.java
@@ -15,6 +15,7 @@
*/
package org.apache.aurora.scheduler;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
@@ -24,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
@@ -52,23 +54,21 @@ import static com.google.common.base.Preconditions.checkNotNull;
class TaskVars implements EventSubscriber {
private static final Logger LOG = Logger.getLogger(TaskVars.class.getName());
- private final LoadingCache<String, AtomicLong> countersByStatus;
- private final LoadingCache<String, AtomicLong> countersByRack;
-
+ private final LoadingCache<String, Counter> counters;
private final Storage storage;
+ private volatile boolean exporting = false;
@Inject
TaskVars(Storage storage, final StatsProvider statProvider) {
this.storage = checkNotNull(storage);
checkNotNull(statProvider);
- countersByStatus = CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() {
- @Override public AtomicLong load(String statName) {
- return statProvider.makeCounter(statName);
- }
- });
- countersByRack = CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() {
- @Override public AtomicLong load(String rack) {
- return statProvider.makeCounter(rackStatName(rack));
+ counters = CacheBuilder.newBuilder().build(new CacheLoader<String, Counter>() {
+ @Override public Counter load(String statName) {
+ Counter counter = new Counter(statProvider);
+ if (exporting) {
+ counter.exportAs(statName);
+ }
+ return counter;
}
});
}
@@ -95,16 +95,16 @@ class TaskVars implements EventSubscriber {
}
};
- private AtomicLong getCounter(ScheduleStatus status) {
- return countersByStatus.getUnchecked(getVarName(status));
+ private Counter getCounter(ScheduleStatus status) {
+ return counters.getUnchecked(getVarName(status));
}
private void incrementCount(ScheduleStatus status) {
- getCounter(status).incrementAndGet();
+ getCounter(status).increment();
}
private void decrementCount(ScheduleStatus status) {
- getCounter(status).decrementAndGet();
+ getCounter(status).decrement();
}
@Subscribe
@@ -129,7 +129,7 @@ class TaskVars implements EventSubscriber {
});
if (rack.isPresent()) {
- countersByRack.getUnchecked(rack.get()).incrementAndGet();
+ counters.getUnchecked(rackStatName(rack.get())).increment();
} else {
LOG.warning("Failed to find rack attribute associated with host " + host);
}
@@ -138,15 +138,19 @@ class TaskVars implements EventSubscriber {
@Subscribe
public void schedulerActive(SchedulerActive event) {
- // TODO(wfarner): This should probably induce the initial 'export' of stats, so that incomplete
- // values are not surfaced while storage is recovering.
-
// Dummy read the counter for each status counter. This is important to guarantee a stat with
// value zero is present for each state, even if all states are not represented in the task
// store.
for (ScheduleStatus status : ScheduleStatus.values()) {
getCounter(status);
}
+
+ // Initiate export of all counters. This is not done initially to avoid exporting values that
+ // do not represent the entire storage contents.
+ exporting = true;
+ for (Map.Entry<String, Counter> entry : counters.asMap().entrySet()) {
+ entry.getValue().exportAs(entry.getKey());
+ }
}
@Subscribe
@@ -155,4 +159,34 @@ class TaskVars implements EventSubscriber {
decrementCount(task.getStatus());
}
}
+
+ private static class Counter implements Supplier<Long> {
+ private final AtomicLong value = new AtomicLong();
+ private boolean exported = false;
+ private final StatsProvider stats;
+
+ Counter(StatsProvider stats) {
+ this.stats = stats;
+ }
+
+ @Override
+ public Long get() {
+ return value.get();
+ }
+
+ private synchronized void exportAs(String name) {
+ if (!exported) {
+ stats.makeGauge(name, this);
+ exported = true;
+ }
+ }
+
+ private void increment() {
+ value.incrementAndGet();
+ }
+
+ private void decrement() {
+ value.decrementAndGet();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/47979138/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
index dde053c..d6fc72e 100644
--- a/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/TaskVarsTest.java
@@ -16,11 +16,12 @@
package org.apache.aurora.scheduler;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
+import com.twitter.common.stats.Stat;
import com.twitter.common.stats.StatsProvider;
import com.twitter.common.testing.easymock.EasyMockTest;
@@ -36,6 +37,8 @@ import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
import org.easymock.IExpectationSetters;
import org.junit.Before;
import org.junit.Test;
@@ -49,6 +52,7 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
public class TaskVarsTest extends EasyMockTest {
@@ -60,7 +64,7 @@ public class TaskVarsTest extends EasyMockTest {
private StorageTestUtil storageUtil;
private StatsProvider trackedStats;
private TaskVars vars;
- private Map<ScheduleStatus, AtomicLong> globalCounters;
+ private Map<String, Supplier<Long>> globalCounters;
@Before
public void setUp() {
@@ -72,11 +76,21 @@ public class TaskVarsTest extends EasyMockTest {
globalCounters = Maps.newHashMap();
}
+ private void expectStatExport(final String name) {
+ expect(trackedStats.makeGauge(EasyMock.eq(name), EasyMock.<Supplier<Long>>anyObject()))
+ .andAnswer(new IAnswer<Stat<Long>>() {
+ @SuppressWarnings("unchecked")
+ @Override public Stat<Long> answer() {
+ assertFalse(globalCounters.containsKey(name));
+ globalCounters.put(name, (Supplier<Long>) EasyMock.getCurrentArguments()[1]);
+ return null;
+ }
+ });
+ }
+
private void expectStatusCountersInitialized() {
for (ScheduleStatus status : ScheduleStatus.values()) {
- AtomicLong counter = new AtomicLong(0);
- globalCounters.put(status, counter);
- expect(trackedStats.makeCounter(TaskVars.getVarName(status))).andReturn(counter);
+ expectStatExport(TaskVars.getVarName(status));
}
}
@@ -109,8 +123,8 @@ public class TaskVarsTest extends EasyMockTest {
}
private void assertAllZero() {
- for (AtomicLong counter : globalCounters.values()) {
- assertEquals(0L, counter.get());
+ for (Supplier<Long> counter : globalCounters.values()) {
+ assertEquals(0L, counter.get().longValue());
}
}
@@ -128,8 +142,19 @@ public class TaskVarsTest extends EasyMockTest {
public void testNoEarlyExport() {
control.replay();
- // No variables should be exported prior to storage starting.
+ // No variables should be exported since schedulerActive is never called.
vars = new TaskVars(storageUtil.storage, trackedStats);
+ IScheduledTask taskA = makeTask(JOB_A, INIT);
+ changeState(taskA, PENDING);
+ changeState(IScheduledTask.build(taskA.newBuilder().setStatus(PENDING)), ASSIGNED);
+ }
+
+ private int getValue(String name) {
+ return globalCounters.get(name).get().intValue();
+ }
+
+ private int getValue(ScheduleStatus status) {
+ return getValue(TaskVars.getVarName(status));
}
@Test
@@ -141,16 +166,16 @@ public class TaskVarsTest extends EasyMockTest {
IScheduledTask taskA = makeTask(JOB_A, INIT);
changeState(taskA, PENDING);
- assertEquals(1, globalCounters.get(PENDING).get());
+ assertEquals(1, getValue(PENDING));
changeState(IScheduledTask.build(taskA.newBuilder().setStatus(PENDING)), ASSIGNED);
- assertEquals(0, globalCounters.get(PENDING).get());
- assertEquals(1, globalCounters.get(ASSIGNED).get());
+ assertEquals(0, getValue(PENDING));
+ assertEquals(1, getValue(ASSIGNED));
changeState(IScheduledTask.build(taskA.newBuilder().setStatus(ASSIGNED)), RUNNING);
- assertEquals(0, globalCounters.get(ASSIGNED).get());
- assertEquals(1, globalCounters.get(RUNNING).get());
+ assertEquals(0, getValue(ASSIGNED));
+ assertEquals(1, getValue(RUNNING));
changeState(IScheduledTask.build(taskA.newBuilder().setStatus(RUNNING)), FINISHED);
- assertEquals(0, globalCounters.get(RUNNING).get());
- assertEquals(1, globalCounters.get(FINISHED).get());
+ assertEquals(0, getValue(RUNNING));
+ assertEquals(1, getValue(FINISHED));
vars.tasksDeleted(new TasksDeleted(ImmutableSet.of(
IScheduledTask.build(taskA.newBuilder().setStatus(FINISHED)))));
assertAllZero();
@@ -168,10 +193,10 @@ public class TaskVarsTest extends EasyMockTest {
makeTask(JOB_B, PENDING),
makeTask(JOB_B, FAILED));
- assertEquals(2, globalCounters.get(PENDING).get());
- assertEquals(1, globalCounters.get(RUNNING).get());
- assertEquals(1, globalCounters.get(FINISHED).get());
- assertEquals(1, globalCounters.get(FAILED).get());
+ assertEquals(2, getValue(PENDING));
+ assertEquals(1, getValue(RUNNING));
+ assertEquals(1, getValue(FINISHED));
+ assertEquals(1, getValue(FAILED));
}
private IExpectationSetters<?> expectGetHostRack(String host, String rackToReturn) {
@@ -190,10 +215,8 @@ public class TaskVarsTest extends EasyMockTest {
expectGetHostRack("host2", "rackB").atLeastOnce();
expectGetHostRack("host3", "rackB").atLeastOnce();
- AtomicLong rackA = new AtomicLong();
- expect(trackedStats.makeCounter(TaskVars.rackStatName("rackA"))).andReturn(rackA);
- AtomicLong rackB = new AtomicLong();
- expect(trackedStats.makeCounter(TaskVars.rackStatName("rackB"))).andReturn(rackB);
+ expectStatExport(TaskVars.rackStatName("rackA"));
+ expectStatExport(TaskVars.rackStatName("rackB"));
control.replay();
schedulerActivated();
@@ -208,8 +231,8 @@ public class TaskVarsTest extends EasyMockTest {
changeState(c, LOST);
changeState(d, LOST);
- assertEquals(2, rackA.get());
- assertEquals(2, rackB.get());
+ assertEquals(2, getValue(TaskVars.rackStatName("rackA")));
+ assertEquals(2, getValue(TaskVars.rackStatName("rackB")));
}
@Test