You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:59 UTC

[43/50] [abbrv] samza git commit: SAMZA-834: fix perf degradation due to frequent update of Timer metrics

SAMZA-834: fix perf degradation due to frequent update of Timer metrics


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/adf4f39a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/adf4f39a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/adf4f39a

Branch: refs/heads/samza-sql
Commit: adf4f39af6666aeee1e8bc7f2c9ba6d208c6fa6e
Parents: d859378
Author: Xinyu Liu <xi...@linkedin.com>
Authored: Tue Dec 8 14:56:54 2015 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Dec 8 14:56:54 2015 -0800

----------------------------------------------------------------------
 .../metrics/SlidingTimeWindowReservoir.java     | 26 +++++++++++++++-----
 .../java/org/apache/samza/metrics/Timer.java    | 12 +++++++++
 .../metrics/TestSlidingTimeWindowReservoir.java | 15 ++++++++---
 .../org/apache/samza/metrics/TestTimer.java     |  2 +-
 .../apache/samza/container/TestRunLoop.scala    | 18 +++++++++++++-
 5 files changed, 61 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/adf4f39a/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java b/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java
index df54359..4116473 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java
@@ -31,9 +31,9 @@ import org.apache.samza.util.Clock;
 public class SlidingTimeWindowReservoir implements Reservoir {
 
   /**
-   * Allow this amount of values to have the same updating time.
+   * default collision buffer
    */
-  private static final int TIME_COLLISION_BUFFER = 256;
+  private static final int DEFAULT_TIME_COLLISION_BUFFER = 1;
 
   /**
    * Run {@link #removeExpireValues} once every this amount of {@link #update}s
@@ -46,8 +46,13 @@ public class SlidingTimeWindowReservoir implements Reservoir {
   private static final int DEFAULT_WINDOW_SIZE_MS = 300000;
 
   /**
+   * Allow this amount of values to have the same updating time.
+   */
+  private final int collisionBuffer;
+
+  /**
    * Size of the window. The unit is millisecond. It is as
-   * <code>TIME_COLLISION_BUFFER</code> times big as the original window size.
+   * <code>collisionBuffer</code> times big as the original window size.
    */
   private final long windowMs;
 
@@ -93,11 +98,16 @@ public class SlidingTimeWindowReservoir implements Reservoir {
   }
 
   public SlidingTimeWindowReservoir(long windowMs, Clock clock) {
-    this.windowMs = windowMs * TIME_COLLISION_BUFFER;
+    this(windowMs, DEFAULT_TIME_COLLISION_BUFFER, clock);
+  }
+
+  public SlidingTimeWindowReservoir(long windowMs, int collisionBuffer, Clock clock) {
+    this.windowMs = windowMs * collisionBuffer;
     this.storage = new ConcurrentSkipListMap<Long, Long>();
     this.count = new AtomicLong();
     this.lastUpdatingTime = new AtomicLong();
     this.clock = clock;
+    this.collisionBuffer = collisionBuffer;
   }
 
   @Override
@@ -126,15 +136,19 @@ public class SlidingTimeWindowReservoir implements Reservoir {
    * value's, use the last updating time + 1 as the new updating time. This
    * operation guarantees all the updating times in the <code>storage</code>
    * strictly increment. No override happens before reaching the
-   * <code>TIME_COLLISION_BUFFER</code>.
+   * <code>collisionBuffer</code>.
    *
    * @return the updating time
    */
   private long getUpdatingTime() {
     while (true) {
       long oldTime = lastUpdatingTime.get();
-      long newTime = clock.currentTimeMillis() * TIME_COLLISION_BUFFER;
+      long newTime = clock.currentTimeMillis() * collisionBuffer;
       long updatingTime = newTime > oldTime ? newTime : oldTime + 1;
+      // make sure the updateTime doesn't overflow to the next millisecond
+      if (updatingTime == newTime + collisionBuffer) {
+        --updatingTime;
+      }
       // make sure no other threads modify the lastUpdatingTime
       if (lastUpdatingTime.compareAndSet(oldTime, updatingTime)) {
         return updatingTime;

http://git-wip-us.apache.org/repos/asf/samza/blob/adf4f39a/samza-api/src/main/java/org/apache/samza/metrics/Timer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Timer.java b/samza-api/src/main/java/org/apache/samza/metrics/Timer.java
index b49d147..96715e8 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/Timer.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/Timer.java
@@ -52,6 +52,18 @@ public class Timer implements Metric {
   }
 
   /**
+   * Construct a {@link Timer} with given window size and collision buffer
+   *
+   * @param name name of this timer
+   * @param windowMs the window size. unit is millisecond
+   * @param collisionBuffer amount of collisions allowed in one millisecond.
+   * @param clock the clock for the reservoir
+   */
+  public Timer(String name, long windowMs, int collisionBuffer, Clock clock) {
+    this(name, new SlidingTimeWindowReservoir(windowMs, collisionBuffer, clock));
+  }
+
+  /**
    * Construct a {@link Timer} with given {@link Reservoir}
    *
    * @param name name of this timer

http://git-wip-us.apache.org/repos/asf/samza/blob/adf4f39a/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java b/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
index d392b32..aca0f9d 100644
--- a/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
+++ b/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
@@ -34,7 +34,7 @@ public class TestSlidingTimeWindowReservoir {
   @Test
   public void testUpdateSizeSnapshot() {
     SlidingTimeWindowReservoir slidingTimeWindowReservoir =
-        new SlidingTimeWindowReservoir(300, clock);
+        new SlidingTimeWindowReservoir(300, 8, clock);
 
     when(clock.currentTimeMillis()).thenReturn(0L);
     slidingTimeWindowReservoir.update(1L);
@@ -55,20 +55,26 @@ public class TestSlidingTimeWindowReservoir {
   @Test
   public void testDuplicateTime() {
     SlidingTimeWindowReservoir slidingTimeWindowReservoir =
-        new SlidingTimeWindowReservoir(300, clock);
-    when(clock.currentTimeMillis()).thenReturn(0L);
+        new SlidingTimeWindowReservoir(300, 2, clock);
+    when(clock.currentTimeMillis()).thenReturn(1L);
     slidingTimeWindowReservoir.update(1L);
     slidingTimeWindowReservoir.update(2L);
 
     Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot();
     assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L)));
     assertEquals(2, snapshot.getSize());
+
+    // update causes collision, will override the last update
+    slidingTimeWindowReservoir.update(3L);
+    snapshot = slidingTimeWindowReservoir.getSnapshot();
+    assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 3L)));
+    assertEquals(2, snapshot.getSize());
   }
 
   @Test
   public void testRemoveExpiredValues() {
     SlidingTimeWindowReservoir slidingTimeWindowReservoir =
-        new SlidingTimeWindowReservoir(300, clock);
+        new SlidingTimeWindowReservoir(300, 8, clock);
     when(clock.currentTimeMillis()).thenReturn(0L);
     slidingTimeWindowReservoir.update(1L);
 
@@ -85,4 +91,5 @@ public class TestSlidingTimeWindowReservoir {
     assertTrue(snapshot.getValues().containsAll(Arrays.asList(3L, 4L)));
     assertEquals(2, snapshot.getSize());
   }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/adf4f39a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
index 63c183f..8076e02 100644
--- a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
+++ b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
@@ -42,7 +42,7 @@ public class TestTimer {
 
   @Test
   public void testDefaultTimerUpdateAndGetSnapshot() {
-    Timer timer = new Timer("test");
+    Timer timer = new Timer("test", 300, clock);
     timer.update(1L);
     timer.update(2L);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/adf4f39a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index b4d6f35..ad37447 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -19,10 +19,14 @@
 
 package org.apache.samza.container
 
+
+import org.apache.samza.metrics.{Timer, SlidingTimeWindowReservoir, MetricsRegistryMap}
+import org.apache.samza.util.Clock
 import org.junit.Test
 import org.junit.Assert._
 import org.mockito.Matchers
 import org.mockito.Mockito._
+import org.mockito.internal.util.reflection.Whitebox
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.scalatest.junit.AssertionsForJUnit
@@ -183,7 +187,19 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
     var now = 0L
     val consumers = mock[SystemConsumers]
     when(consumers.choose).thenReturn(envelope0)
-    val testMetrics = new SamzaContainerMetrics
+    val clock = new Clock {
+      var c = 0L
+      def currentTimeMillis: Long = {
+        c += 1L
+        c
+      }
+    }
+    val testMetrics = new SamzaContainerMetrics("test", new MetricsRegistryMap() {
+      override def newTimer(group: String, name: String) = {
+        newTimer(group, new Timer(name, new SlidingTimeWindowReservoir(300000, clock)))
+      }
+    })
+
     val runLoop = new RunLoop(
       taskInstances = getMockTaskInstances,
       consumerMultiplexer = consumers,