You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/05/15 02:27:52 UTC

git commit: Starting SLA calculations on SchedulerActive event.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 5cdc03b24 -> cde61079e


Starting SLA calculations on SchedulerActive event.

Bugs closed: AURORA-410

Reviewed at https://reviews.apache.org/r/21349/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/cde61079
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/cde61079
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/cde61079

Branch: refs/heads/master
Commit: cde61079e9fd0e959373f925e268e40c60ab764b
Parents: 5cdc03b
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Wed May 14 17:27:34 2014 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Wed May 14 17:27:34 2014 -0700

----------------------------------------------------------------------
 .../aurora/scheduler/sla/MetricCalculator.java  |   6 +-
 .../apache/aurora/scheduler/sla/SlaModule.java  |  51 ++++++--
 .../aurora/scheduler/sla/SlaModuleTest.java     | 130 +++++++++++++++++++
 3 files changed, 177 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cde61079/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java b/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
index 13449cc..f949e46 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
@@ -97,6 +97,10 @@ class MetricCalculator implements Runnable {
     MetricCalculatorSettings(long refreshRateMs) {
       this.refreshRateMs = refreshRateMs;
     }
+
+    long getRefreshRateMs() {
+      return refreshRateMs;
+    }
   }
 
   private static class Counter implements Supplier<Number> {
@@ -152,7 +156,7 @@ class MetricCalculator implements Runnable {
                 Tasks.SCHEDULED_TO_INFO)).toList();
 
     long nowMs = clock.nowMillis();
-    long intervalStartMs = nowMs - settings.refreshRateMs;
+    long intervalStartMs = nowMs - settings.getRefreshRateMs();
 
     for (Entry<AlgorithmType, GroupType> slaMetric : METRICS.entries()) {
       for (Entry<String, Collection<IScheduledTask>> namedGroup

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cde61079/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
index 8a0707e..5c655df 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
@@ -23,17 +23,24 @@ import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.eventbus.Subscribe;
 import com.google.inject.AbstractModule;
 import com.google.inject.BindingAnnotation;
 import com.google.inject.Singleton;
+
 import com.twitter.common.application.modules.LifecycleModule;
 import com.twitter.common.args.Arg;
 import com.twitter.common.args.CmdLine;
+import com.twitter.common.args.constraints.Positive;
 import com.twitter.common.base.Command;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 
 import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.sla.MetricCalculator.MetricCalculatorSettings;
 
 import static java.lang.annotation.ElementType.FIELD;
@@ -50,41 +57,67 @@ public class SlaModule extends AbstractModule {
 
   private static final Logger LOG = Logger.getLogger(SlaModule.class.getName());
 
-  @CmdLine(name = "sla_stat_refresh_rate", help = "The SLA stat refresh rate.")
-  private static final Arg<Amount<Long, Time>> SLA_REFRESH_RATE =
+  @Positive
+  @CmdLine(name = "sla_stat_refresh_interval", help = "The SLA stat refresh interval.")
+  private static final Arg<Amount<Long, Time>> SLA_REFRESH_INTERVAL =
       Arg.create(Amount.of(1L, Time.MINUTES));
 
+  @VisibleForTesting
   @BindingAnnotation
   @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  private @interface SlaExecutor { }
+  @interface SlaExecutor { }
+
+  private final Amount<Long, Time> refreshInterval;
+
+  @VisibleForTesting
+  SlaModule(Amount<Long, Time> refreshInterval) {
+    this.refreshInterval = refreshInterval;
+  }
+
+  public SlaModule() {
+    this(SLA_REFRESH_INTERVAL.get());
+  }
 
   @Override
   protected void configure() {
-    bind(MetricCalculatorSettings.class).toInstance(
-        new MetricCalculatorSettings(SLA_REFRESH_RATE.get().as(Time.MILLISECONDS)));
+    bind(MetricCalculatorSettings.class)
+        .toInstance(new MetricCalculatorSettings(refreshInterval.as(Time.MILLISECONDS)));
 
     bind(MetricCalculator.class).in(Singleton.class);
     bind(ScheduledExecutorService.class)
         .annotatedWith(SlaExecutor.class)
         .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("SlaStat-%d", LOG));
 
+    PubsubEventModule.bindSubscriber(binder(), SlaUpdater.class);
     LifecycleModule.bindStartupAction(binder(), SlaUpdater.class);
   }
 
-  static class SlaUpdater implements Command {
+  static class SlaUpdater implements Command, EventSubscriber {
     private final ScheduledExecutorService executor;
     private final MetricCalculator calculator;
+    private final MetricCalculatorSettings settings;
 
     @Inject
-    SlaUpdater(@SlaExecutor ScheduledExecutorService executor, MetricCalculator calculator) {
+    SlaUpdater(
+        @SlaExecutor ScheduledExecutorService executor,
+        MetricCalculator calculator,
+        MetricCalculatorSettings settings) {
+
       this.executor = checkNotNull(executor);
       this.calculator = checkNotNull(calculator);
+      this.settings = checkNotNull(settings);
+    }
+
+    @Subscribe
+    public void schedulerActive(SchedulerActive event) {
+      long interval = settings.getRefreshRateMs();
+      executor.scheduleAtFixedRate(calculator, interval, interval, TimeUnit.MILLISECONDS);
+      LOG.info(String.format("Scheduled SLA calculation with %d msec interval.", interval));
     }
 
     @Override
     public void execute() throws RuntimeException {
-      long interval = SLA_REFRESH_RATE.get().as(Time.SECONDS);
-      executor.scheduleAtFixedRate(calculator, interval, interval, TimeUnit.SECONDS);
+      // Execution scheduled on SchedulerActive event.
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/cde61079/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java b/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java
new file mode 100644
index 0000000..dc955bb
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/sla/SlaModuleTest.java
@@ -0,0 +1,130 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.sla;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Module;
+import com.twitter.common.application.modules.AppLauncherModule;
+import com.twitter.common.application.modules.LifecycleModule;
+import com.twitter.common.base.Supplier;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stat;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.Clock;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
+import org.apache.aurora.scheduler.state.PubsubTestUtil;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.easymock.EasyMock.expect;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class SlaModuleTest extends EasyMockTest {
+
+  private Injector injector;
+  private FakeClock clock;
+  private StorageTestUtil storageUtil;
+  private StatsProvider statsProvider;
+  private SlaModule module;
+  private EventSink eventSink;
+
+  @Before
+  public void setUp() throws Exception {
+    storageUtil = new StorageTestUtil(this);
+    clock = new FakeClock();
+    statsProvider = createMock(StatsProvider.class);
+    module = new SlaModule(Amount.of(5L, Time.MILLISECONDS));
+    injector = Guice.createInjector(
+        ImmutableList.<Module>builder()
+            .add(module)
+            .add(new LifecycleModule())
+            .add(new AppLauncherModule())
+            .add(new AbstractModule() {
+              @Override
+              protected void configure() {
+                PubsubTestUtil.installPubsub(binder());
+                bind(Clock.class).toInstance(clock);
+                bind(Storage.class).toInstance(storageUtil.storage);
+                bind(StatsProvider.class).toInstance(statsProvider);
+              }
+            }).build()
+    );
+    eventSink = PubsubTestUtil.startPubsub(injector);
+  }
+
+  @Test
+  public void testNoSchedulingOnStart() {
+    assertNotNull(module);
+
+    control.replay();
+
+    ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) injector.getInstance(
+        Key.get(ScheduledExecutorService.class, SlaModule.SlaExecutor.class));
+
+    assertEquals(0, executor.getQueue().size());
+    assertEquals(0, executor.getActiveCount());
+  }
+
+  @Test
+  public void testSchedulingOnEvent() throws Exception {
+    assertNotNull(module);
+
+    final CountDownLatch latch = new CountDownLatch(1);
+    StatsProvider untracked = createMock(StatsProvider.class);
+    expect(statsProvider.untracked()).andReturn(untracked).anyTimes();
+    expect(untracked.makeGauge(EasyMock.anyString(), EasyMock.<Supplier<Number>>anyObject()))
+        .andReturn(EasyMock.<Stat<Number>>anyObject())
+        .andAnswer(new IAnswer<Stat<Number>>() {
+          @Override
+          public Stat<Number> answer() throws Throwable {
+            latch.countDown();
+            return null;
+          }
+        }).anyTimes();
+
+    storageUtil.expectTaskFetch(
+        Query.unscoped(),
+        SlaTestUtil.makeTask(ImmutableMap.of(clock.nowMillis() - 1000, PENDING), 0)).anyTimes();
+    storageUtil.expectOperations();
+
+    control.replay();
+
+    eventSink.post(new SchedulerActive());
+    latch.await();
+  }
+}