You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by me...@apache.org on 2017/02/25 04:38:03 UTC

aurora git commit: Currently snapshot times are exposed for the entire snapshot save/apply operation. This patch provides the means to collect finer grained metrics on individual fields in a snapshot.

Repository: aurora
Updated Branches:
  refs/heads/master 98eb99aaa -> fadfd554d


Currently snapshot times are exposed for the entire snapshot save/apply operation. This patch provides the means to collect finer grained metrics on individual fields in a snapshot.

Bugs closed: AURORA-1870

Reviewed at https://reviews.apache.org/r/55105/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/fadfd554
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/fadfd554
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/fadfd554

Branch: refs/heads/master
Commit: fadfd554dc400e58aefc82b699adb62357a7577e
Parents: 98eb99a
Author: Mehrdad Nurolahzade <me...@apache.org>
Authored: Fri Feb 24 20:31:20 2017 -0800
Committer: Mehrdad Nurolahzade <mn...@twitter.com>
Committed: Fri Feb 24 20:31:20 2017 -0800

----------------------------------------------------------------------
 .../aurora/common/stats/SlidingStats.java       |  86 ++++++++++-
 .../aurora/common/stats/SlidingStatsTest.java   | 142 +++++++++++++++++++
 .../storage/log/SnapshotStoreImpl.java          |  42 +++++-
 .../storage/log/SnapshotStoreImplIT.java        |  33 +++++
 4 files changed, 295 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/fadfd554/commons/src/main/java/org/apache/aurora/common/stats/SlidingStats.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/stats/SlidingStats.java b/commons/src/main/java/org/apache/aurora/common/stats/SlidingStats.java
index f7a5ae4..f2590bb 100644
--- a/commons/src/main/java/org/apache/aurora/common/stats/SlidingStats.java
+++ b/commons/src/main/java/org/apache/aurora/common/stats/SlidingStats.java
@@ -16,6 +16,9 @@ package org.apache.aurora.common.stats;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.util.Clock;
+
+import static java.util.Objects.requireNonNull;
 
 /**
  * Tracks event statistics over a sliding window of time. An event is something that has a
@@ -25,11 +28,70 @@ import org.apache.aurora.common.base.MorePreconditions;
  */
 public class SlidingStats {
 
+  /**
+   * An abstraction for an action to be timed by SlidingStats.
+   *
+   * @param <V> The result of the successfully completed action.
+   * @param <E> The exception type that the action might throw.
+   */
+  @FunctionalInterface
+  public interface Timeable<V, E extends Exception> {
+
+    /**
+     * A convenient typedef for action that throws no checked exceptions - it runs quietly.
+     *
+     * @param <V> The result of the successfully completed action.
+     */
+    @FunctionalInterface
+    interface Quiet<V> extends Timeable<V, RuntimeException> {
+      // empty
+    }
+
+    /**
+     * Encapsulates an action with no result.
+     *
+     * @param <E> The exception type that the action might throw.
+     */
+    @FunctionalInterface
+    interface NoResult<E extends Exception> extends Timeable<Void, E> {
+      @Override
+      default Void invoke() throws E {
+        execute();
+        return null;
+      }
+
+      /**
+       * Similar to {@link Timeable#invoke()} except no result is returned.
+       *
+       * @throws E If action fails.
+       */
+      void execute() throws E;
+
+      /**
+       * A convenient typedef for action with no result that throws no checked exceptions - it runs
+       * quietly.
+       */
+      @FunctionalInterface
+      interface Quiet extends NoResult<RuntimeException> {
+        // empty
+      }
+    }
+
+    /**
+     * Abstracts an action that has a result, but may also throw a specific exception.
+     *
+     * @return The result of the successfully completed action.
+     * @throws E If action fails.
+     */
+    V invoke() throws E;
+  }
+
   private static final int DEFAULT_WINDOW_SIZE = 1;
 
   private final AtomicLong total;
   private final AtomicLong events;
   private final Stat<Double> perEventLatency;
+  private final Clock clock;
 
   /**
    * Creates a new sliding statistic with the given name
@@ -38,7 +100,7 @@ public class SlidingStats {
    * @param totalUnitDisplay String to display for the total counter unit.
    */
   public SlidingStats(String name, String totalUnitDisplay) {
-    this(name, totalUnitDisplay, DEFAULT_WINDOW_SIZE);
+    this(name, totalUnitDisplay, DEFAULT_WINDOW_SIZE, Clock.SYSTEM_CLOCK);
   }
 
   /**
@@ -47,8 +109,9 @@ public class SlidingStats {
    * @param name Name for this stat collection.
    * @param totalUnitDisplay String to display for the total counter unit.
    * @param windowSize The window size for the per second Rate and Ratio stats.
+   * @param clock The clock abstraction to use for timing in {@link #time(Timeable)} calls.
    */
-  public SlidingStats(String name, String totalUnitDisplay, int windowSize) {
+  public SlidingStats(String name, String totalUnitDisplay, int windowSize, Clock clock) {
     MorePreconditions.checkNotBlank(name);
 
     String totalDisplay = name + "_" + totalUnitDisplay + "_total";
@@ -58,6 +121,7 @@ public class SlidingStats {
     perEventLatency = Stats.export(Ratio.of(name + "_" + totalUnitDisplay + "_per_event",
         Rate.of(totalDisplay + "_per_sec", total).withWindowSize(windowSize).build(),
         Rate.of(eventDisplay + "_per_sec", events).withWindowSize(windowSize).build()));
+    this.clock = requireNonNull(clock);
   }
 
   public AtomicLong getTotalCounter() {
@@ -86,6 +150,24 @@ public class SlidingStats {
     events.incrementAndGet();
   }
 
+  /**
+   * Accumulates counter by the nanoseconds it takes to execute the supplied action.
+   *
+   * @param action An action that produces result of type V and may throw exception E.
+   * @param <V> The return type of action.
+   * @param <E> The exception type that might be thrown by action.
+   * @return The value returned by action.
+   * @throws E A subclass of {@link Exception} that might be thrown by action.
+   */
+  public <V, E extends Exception> V time(Timeable<V, E> action) throws E {
+    long start = clock.nowNanos();
+    try {
+      return action.invoke();
+    } finally {
+      accumulate(clock.nowNanos() - start);
+    }
+  }
+
   @Override
   public String toString() {
     return total + " " + events;

http://git-wip-us.apache.org/repos/asf/aurora/blob/fadfd554/commons/src/test/java/org/apache/aurora/common/stats/SlidingStatsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/stats/SlidingStatsTest.java b/commons/src/test/java/org/apache/aurora/common/stats/SlidingStatsTest.java
new file mode 100644
index 0000000..5b58624
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/stats/SlidingStatsTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.SlidingStats.Timeable;
+import org.apache.aurora.common.util.testing.FakeClock;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+public class SlidingStatsTest {
+
+  private static final long OPERATION_TIME = 5L;
+  private static final Amount<Long, Time> OPERATION_TIME_AMOUNT =
+      Amount.of(OPERATION_TIME, Time.NANOSECONDS);
+
+  private FakeClock clock;
+  private TimedActions actions;
+  private SlidingStats stat;
+
+  @Before
+  public void setUp() {
+    clock = new FakeClock();
+    actions = new TimedActions(clock);
+    stat = new SlidingStats("a", "nanos", 1, clock);
+  }
+
+  @Test
+  public void testAccumulate() {
+    assertEquals(0L, stat.getEventCounter().get());
+    assertEquals(0L, stat.getTotalCounter().get());
+
+    stat.accumulate(100L);
+
+    assertEquals(1L, stat.getEventCounter().get());
+    assertEquals(100L, stat.getTotalCounter().get());
+  }
+
+  @Test
+  public void testTimeable() {
+    String value = stat.time(() -> actions.quietAction("World"));
+    assertEquals("HelloWorld", value);
+    assertEquals(1L, stat.getEventCounter().get());
+    assertEquals(OPERATION_TIME, stat.getTotalCounter().get());
+  }
+
+  @Test
+  public void testNoResultQuietTimeable() {
+    stat.time((Timeable.NoResult.Quiet) actions::noResultQuietAction);
+    assertEquals(1L, stat.getEventCounter().get());
+    assertEquals(OPERATION_TIME, stat.getTotalCounter().get());
+  }
+
+  @Test
+  public void testTimeableThrowsException() {
+    String value = null;
+    try {
+      value = stat.time(actions::action);
+      fail("Should have thrown exception.");
+    } catch (Exception e) {
+      assertEquals("Expected!", e.getMessage());
+    }
+    assertNull(value);
+    assertEquals(1L, stat.getEventCounter().get());
+    assertEquals(OPERATION_TIME, stat.getTotalCounter().get());
+  }
+
+  @Test
+  public void testNoResultTimeableThrowsException() {
+    try {
+      stat.time((Timeable.NoResult<Exception>) actions::noResultAction);
+      fail("Should have thrown exception.");
+    } catch (Exception e) {
+      assertEquals("Expected!", e.getMessage());
+    }
+    assertEquals(1L, stat.getEventCounter().get());
+    assertEquals(OPERATION_TIME, stat.getTotalCounter().get());
+  }
+
+  @Test
+  public void testQuietTimeableThrowsRuntimeException() {
+    String value = null;
+    try {
+      value = stat.time(actions::quietExceptionalAction);
+      fail("Should have thrown exception.");
+    } catch (RuntimeException e) {
+      assertEquals("Expected!", e.getMessage());
+    }
+    assertNull(value);
+    assertEquals(1L, stat.getEventCounter().get());
+    assertEquals(OPERATION_TIME, stat.getTotalCounter().get());
+  }
+
+  private static class TimedActions {
+    private FakeClock clock;
+
+    public TimedActions(FakeClock clock) {
+      this.clock = clock;
+    }
+
+    String action() throws Exception{
+      clock.advance(OPERATION_TIME_AMOUNT);
+      throw new Exception("Expected!");
+    }
+
+    String quietAction(String input) {
+      clock.advance(OPERATION_TIME_AMOUNT);
+      return "Hello" + input;
+    }
+
+    String quietExceptionalAction() {
+      clock.advance(OPERATION_TIME_AMOUNT);
+      throw new RuntimeException("Expected!");
+    }
+
+    void noResultAction() throws Exception {
+      clock.advance(OPERATION_TIME_AMOUNT);
+      throw new Exception("Expected!");
+    }
+
+    void noResultQuietAction() {
+      clock.advance(OPERATION_TIME_AMOUNT);
+      System.gc();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/fadfd554/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
index 7aa111e..81a8cca 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
@@ -30,13 +30,19 @@ import javax.inject.Inject;
 import javax.inject.Qualifier;
 import javax.sql.DataSource;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 
 import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.stats.SlidingStats;
+import org.apache.aurora.common.stats.SlidingStats.Timeable;
 import org.apache.aurora.common.util.BuildInfo;
 import org.apache.aurora.common.util.Clock;
 import org.apache.aurora.gen.HostAttributes;
@@ -76,6 +82,11 @@ import static java.util.Objects.requireNonNull;
  */
 public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
 
+  @VisibleForTesting
+  static final String SNAPSHOT_SAVE = "snapshot_save_";
+  @VisibleForTesting
+  static final String SNAPSHOT_RESTORE = "snapshot_restore_";
+
   private static final Logger LOG = LoggerFactory.getLogger(SnapshotStoreImpl.class);
 
   /**
@@ -473,7 +484,7 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
       // one of the field closures is mean and tries to apply a timestamp.
       long timestamp = clock.nowMillis();
       for (SnapshotField field : snapshotFields) {
-        field.saveToSnapshot(storeProvider, snapshot);
+        field.save(storeProvider, snapshot);
       }
 
       SchedulerMetadata metadata = new SchedulerMetadata()
@@ -495,16 +506,35 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
       LOG.info("Restoring snapshot.");
 
       for (SnapshotField field : snapshotFields) {
-        field.restoreFromSnapshot(storeProvider, snapshot);
+        field.restore(storeProvider, snapshot);
       }
     });
   }
 
-  private interface SnapshotField {
-    String getName();
+  abstract class SnapshotField {
+
+    abstract String getName();
+
+    abstract void saveToSnapshot(MutableStoreProvider storeProvider, Snapshot snapshot);
+
+    abstract void restoreFromSnapshot(MutableStoreProvider storeProvider, Snapshot snapshot);
 
-    void saveToSnapshot(MutableStoreProvider storeProvider, Snapshot snapshot);
+    void save(MutableStoreProvider storeProvider, Snapshot snapshot) {
+      stats.getUnchecked(SNAPSHOT_SAVE + getName())
+          .time((Timeable.NoResult.Quiet) () -> saveToSnapshot(storeProvider, snapshot));
+    }
 
-    void restoreFromSnapshot(MutableStoreProvider storeProvider, Snapshot snapshot);
+    void restore(MutableStoreProvider storeProvider, Snapshot snapshot) {
+      stats.getUnchecked(SNAPSHOT_RESTORE + getName())
+          .time((Timeable.NoResult.Quiet) () -> restoreFromSnapshot(storeProvider, snapshot));
+    }
   }
+
+  private final LoadingCache<String, SlidingStats> stats = CacheBuilder.newBuilder().build(
+      new CacheLoader<String, SlidingStats>() {
+        @Override
+        public SlidingStats load(String name) throws Exception {
+          return new SlidingStats(name, "nanos");
+        }
+      });
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/fadfd554/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
index f56a162..ca95256 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
+import org.apache.aurora.common.stats.Stats;
 import org.apache.aurora.common.util.testing.FakeBuildInfo;
 import org.apache.aurora.common.util.testing.FakeClock;
 import org.apache.aurora.gen.Attribute;
@@ -76,8 +77,11 @@ import static org.apache.aurora.scheduler.storage.db.DbModule.testModuleWithWork
 import static org.apache.aurora.scheduler.storage.db.DbUtil.createStorage;
 import static org.apache.aurora.scheduler.storage.db.DbUtil.createStorageInjector;
 import static org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl.ALL_H2_STORE_FIELDS;
+import static org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl.SNAPSHOT_RESTORE;
+import static org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl.SNAPSHOT_SAVE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -106,6 +110,7 @@ public class SnapshotStoreImplIT {
         hydrateFields,
         createStorageInjector(testModuleWithWorkQueue()).getInstance(MigrationManager.class),
         TaskTestUtil.THRIFT_BACKFILL);
+    Stats.flush();
   }
 
   private static Snapshot makeComparable(Snapshot snapshot) {
@@ -124,11 +129,14 @@ public class SnapshotStoreImplIT {
     Snapshot snapshot1 = snapshotStore.createSnapshot();
     assertEquals(expected(), makeComparable(snapshot1));
     assertFalse(snapshot1.isExperimentalTaskStore());
+    assertSnapshotSaveStats(ALL_H2_STORE_FIELDS, 1L);
 
     snapshotStore.applySnapshot(snapshot1);
     Snapshot snapshot2 = snapshotStore.createSnapshot();
     assertEquals(expected(), makeComparable(snapshot2));
     assertEquals(makeComparable(snapshot1), makeComparable(snapshot2));
+    assertSnapshotRestoreStats(ALL_H2_STORE_FIELDS, 1L);
+    assertSnapshotSaveStats(ALL_H2_STORE_FIELDS, 2L);
   }
 
   @Test
@@ -139,6 +147,7 @@ public class SnapshotStoreImplIT {
     Snapshot snapshot1 = snapshotStore.createSnapshot();
     assertEquals(expected(), makeComparable(snapshot1));
     assertFalse(snapshot1.isExperimentalTaskStore());
+    assertSnapshotSaveStats(ALL_H2_STORE_FIELDS, 1L);
 
     setUpStore(true, ALL_H2_STORE_FIELDS);
     snapshotStore.applySnapshot(snapshot1);
@@ -146,6 +155,8 @@ public class SnapshotStoreImplIT {
     assertTrue(snapshot2.isExperimentalTaskStore());
     assertEquals(expected(), makeComparable(snapshot2));
     assertEquals(makeComparable(snapshot1), makeComparable(snapshot2));
+    assertSnapshotRestoreStats(ALL_H2_STORE_FIELDS, 1L);
+    assertSnapshotSaveStats(ALL_H2_STORE_FIELDS, 1L);
   }
 
   @Test
@@ -156,6 +167,7 @@ public class SnapshotStoreImplIT {
     Snapshot snapshot1 = snapshotStore.createSnapshot();
     assertEquals(expected(), makeComparable(snapshot1));
     assertTrue(snapshot1.isExperimentalTaskStore());
+    assertSnapshotSaveStats(ALL_H2_STORE_FIELDS, 1L);
 
     setUpStore(false, ALL_H2_STORE_FIELDS);
     snapshotStore.applySnapshot(snapshot1);
@@ -163,6 +175,8 @@ public class SnapshotStoreImplIT {
     assertFalse(snapshot2.isExperimentalTaskStore());
     assertEquals(expected(), makeComparable(snapshot2));
     assertEquals(makeComparable(snapshot1), makeComparable(snapshot2));
+    assertSnapshotRestoreStats(ALL_H2_STORE_FIELDS, 1L);
+    assertSnapshotSaveStats(ALL_H2_STORE_FIELDS, 1L);
   }
 
   @Test
@@ -186,11 +200,14 @@ public class SnapshotStoreImplIT {
     Snapshot snapshot1 = snapshotStore.createSnapshot();
     assertEquals(expected(), makeComparable(snapshot1));
     assertTrue(snapshot1.isExperimentalTaskStore());
+    assertSnapshotSaveStats(ALL_H2_STORE_FIELDS, 1L);
 
     snapshotStore.applySnapshot(snapshot1);
     Snapshot snapshot2 = snapshotStore.createSnapshot();
     assertEquals(expected(), makeComparable(snapshot2));
     assertEquals(makeComparable(snapshot1), makeComparable(snapshot2));
+    assertSnapshotRestoreStats(ALL_H2_STORE_FIELDS, 1L);
+    assertSnapshotSaveStats(ALL_H2_STORE_FIELDS, 2L);
   }
 
   @Test
@@ -200,6 +217,8 @@ public class SnapshotStoreImplIT {
 
     Snapshot backfilled = snapshotStore.createSnapshot();
     assertEquals(expected(), makeComparable(backfilled));
+    assertSnapshotRestoreStats(ALL_H2_STORE_FIELDS, 1L);
+    assertSnapshotSaveStats(ALL_H2_STORE_FIELDS, 1L);
   }
 
   private static final IScheduledTask TASK = TaskTestUtil.makeTask("id", JOB_KEY);
@@ -306,4 +325,18 @@ public class SnapshotStoreImplIT {
       );
     });
   }
+
+  private void assertSnapshotSaveStats(Set<String> stats, long count) {
+    for (String stat : stats) {
+      assertEquals(count, Stats.getVariable(SNAPSHOT_SAVE + stat + "_events").read());
+      assertNotNull(Stats.getVariable(SNAPSHOT_SAVE + stat + "_nanos_total"));
+    }
+  }
+
+  private void assertSnapshotRestoreStats(Set<String> stats, long count) {
+    for (String stat : stats) {
+      assertEquals(count, Stats.getVariable(SNAPSHOT_RESTORE + stat + "_events").read());
+      assertNotNull(Stats.getVariable(SNAPSHOT_RESTORE + stat + "_nanos_total"));
+    }
+  }
 }