You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/05/15 02:27:52 UTC
git commit: Starting SLA calculations on SchedulerActive event.
Repository: incubator-aurora
Updated Branches:
refs/heads/master 5cdc03b24 -> cde61079e
Starting SLA calculations on SchedulerActive event.
Bugs closed: AURORA-410
Reviewed at https://reviews.apache.org/r/21349/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/cde61079
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/cde61079
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/cde61079
Branch: refs/heads/master
Commit: cde61079e9fd0e959373f925e268e40c60ab764b
Parents: 5cdc03b
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Wed May 14 17:27:34 2014 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Wed May 14 17:27:34 2014 -0700
----------------------------------------------------------------------
.../aurora/scheduler/sla/MetricCalculator.java | 6 +-
.../apache/aurora/scheduler/sla/SlaModule.java | 51 ++++++--
.../aurora/scheduler/sla/SlaModuleTest.java | 130 +++++++++++++++++++
3 files changed, 177 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cde61079/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java b/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
index 13449cc..f949e46 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
@@ -97,6 +97,10 @@ class MetricCalculator implements Runnable {
MetricCalculatorSettings(long refreshRateMs) {
this.refreshRateMs = refreshRateMs;
}
+
+ long getRefreshRateMs() {
+ return refreshRateMs;
+ }
}
private static class Counter implements Supplier<Number> {
@@ -152,7 +156,7 @@ class MetricCalculator implements Runnable {
Tasks.SCHEDULED_TO_INFO)).toList();
long nowMs = clock.nowMillis();
- long intervalStartMs = nowMs - settings.refreshRateMs;
+ long intervalStartMs = nowMs - settings.getRefreshRateMs();
for (Entry<AlgorithmType, GroupType> slaMetric : METRICS.entries()) {
for (Entry<String, Collection<IScheduledTask>> namedGroup
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cde61079/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
index 8a0707e..5c655df 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
@@ -23,17 +23,24 @@ import java.util.logging.Logger;
import javax.inject.Inject;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.eventbus.Subscribe;
import com.google.inject.AbstractModule;
import com.google.inject.BindingAnnotation;
import com.google.inject.Singleton;
+
import com.twitter.common.application.modules.LifecycleModule;
import com.twitter.common.args.Arg;
import com.twitter.common.args.CmdLine;
+import com.twitter.common.args.constraints.Positive;
import com.twitter.common.base.Command;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
import org.apache.aurora.scheduler.sla.MetricCalculator.MetricCalculatorSettings;
import static java.lang.annotation.ElementType.FIELD;
@@ -50,41 +57,67 @@ public class SlaModule extends AbstractModule {
private static final Logger LOG = Logger.getLogger(SlaModule.class.getName());
- @CmdLine(name = "sla_stat_refresh_rate", help = "The SLA stat refresh rate.")
- private static final Arg<Amount<Long, Time>> SLA_REFRESH_RATE =
+ @Positive
+ @CmdLine(name = "sla_stat_refresh_interval", help = "The SLA stat refresh interval.")
+ private static final Arg<Amount<Long, Time>> SLA_REFRESH_INTERVAL =
Arg.create(Amount.of(1L, Time.MINUTES));
+ @VisibleForTesting
@BindingAnnotation
@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
- private @interface SlaExecutor { }
+ @interface SlaExecutor { }
+
+ private final Amount<Long, Time> refreshInterval;
+
+ @VisibleForTesting
+ SlaModule(Amount<Long, Time> refreshInterval) {
+ this.refreshInterval = refreshInterval;
+ }
+
+ public SlaModule() {
+ this(SLA_REFRESH_INTERVAL.get());
+ }
@Override
protected void configure() {
- bind(MetricCalculatorSettings.class).toInstance(
- new MetricCalculatorSettings(SLA_REFRESH_RATE.get().as(Time.MILLISECONDS)));
+ bind(MetricCalculatorSettings.class)
+ .toInstance(new MetricCalculatorSettings(refreshInterval.as(Time.MILLISECONDS)));
bind(MetricCalculator.class).in(Singleton.class);
bind(ScheduledExecutorService.class)
.annotatedWith(SlaExecutor.class)
.toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("SlaStat-%d", LOG));
+ PubsubEventModule.bindSubscriber(binder(), SlaUpdater.class);
LifecycleModule.bindStartupAction(binder(), SlaUpdater.class);
}
- static class SlaUpdater implements Command {
+ static class SlaUpdater implements Command, EventSubscriber {
private final ScheduledExecutorService executor;
private final MetricCalculator calculator;
+ private final MetricCalculatorSettings settings;
@Inject
- SlaUpdater(@SlaExecutor ScheduledExecutorService executor, MetricCalculator calculator) {
+ SlaUpdater(
+ @SlaExecutor ScheduledExecutorService executor,
+ MetricCalculator calculator,
+ MetricCalculatorSettings settings) {
+
this.executor = checkNotNull(executor);
this.calculator = checkNotNull(calculator);
+ this.settings = checkNotNull(settings);
+ }
+
+ @Subscribe
+ public void schedulerActive(SchedulerActive event) {
+ long interval = settings.getRefreshRateMs();
+ executor.scheduleAtFixedRate(calculator, interval, interval, TimeUnit.MILLISECONDS);
+ LOG.info(String.format("Scheduled SLA calculation with %d msec interval.", interval));
}
@Override
public void execute() throws RuntimeException {
- long interval = SLA_REFRESH_RATE.get().as(Time.SECONDS);
- executor.scheduleAtFixedRate(calculator, interval, interval, TimeUnit.SECONDS);
+ // Execution scheduled on SchedulerActive event.
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cde61079/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java b/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java
new file mode 100644
index 0000000..dc955bb
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java
@@ -0,0 +1,130 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.sla;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Module;
+import com.twitter.common.application.modules.AppLauncherModule;
+import com.twitter.common.application.modules.LifecycleModule;
+import com.twitter.common.base.Supplier;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stat;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
+import org.apache.aurora.scheduler.state.PubsubTestUtil;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.easymock.EasyMock.expect;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class SlaModuleTest extends EasyMockTest {
+
+ private Injector injector;
+ private FakeClock clock;
+ private StorageTestUtil storageUtil;
+ private StatsProvider statsProvider;
+ private SlaModule module;
+ private EventSink eventSink;
+
+ @Before
+ public void setUp() throws Exception {
+ storageUtil = new StorageTestUtil(this);
+ clock = new FakeClock();
+ statsProvider = createMock(StatsProvider.class);
+ module = new SlaModule(Amount.of(5L, Time.MILLISECONDS));
+ injector = Guice.createInjector(
+ ImmutableList.<Module>builder()
+ .add(module)
+ .add(new LifecycleModule())
+ .add(new AppLauncherModule())
+ .add(new AbstractModule() {
+ @Override
+ protected void configure() {
+ PubsubTestUtil.installPubsub(binder());
+ bind(Clock.class).toInstance(clock);
+ bind(Storage.class).toInstance(storageUtil.storage);
+ bind(StatsProvider.class).toInstance(statsProvider);
+ }
+ }).build()
+ );
+ eventSink = PubsubTestUtil.startPubsub(injector);
+ }
+
+ @Test
+ public void testNoSchedulingOnStart() {
+ assertNotNull(module);
+
+ control.replay();
+
+ ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) injector.getInstance(
+ Key.get(ScheduledExecutorService.class, SlaModule.SlaExecutor.class));
+
+ assertEquals(0, executor.getQueue().size());
+ assertEquals(0, executor.getActiveCount());
+ }
+
+ @Test
+ public void testSchedulingOnEvent() throws Exception {
+ assertNotNull(module);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ StatsProvider untracked = createMock(StatsProvider.class);
+ expect(statsProvider.untracked()).andReturn(untracked).anyTimes();
+ expect(untracked.makeGauge(EasyMock.anyString(), EasyMock.<Supplier<Number>>anyObject()))
+ .andReturn(EasyMock.<Stat<Number>>anyObject())
+ .andAnswer(new IAnswer<Stat<Number>>() {
+ @Override
+ public Stat<Number> answer() throws Throwable {
+ latch.countDown();
+ return null;
+ }
+ }).anyTimes();
+
+ storageUtil.expectTaskFetch(
+ Query.unscoped(),
+ SlaTestUtil.makeTask(ImmutableMap.of(clock.nowMillis() - 1000, PENDING), 0)).anyTimes();
+ storageUtil.expectOperations();
+
+ control.replay();
+
+ eventSink.post(new SchedulerActive());
+ latch.await();
+ }
+}