You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/12/08 23:57:09 UTC
samza git commit: SAMZA-834: fix perf degradation due to frequent
update of Timer metrics
Repository: samza
Updated Branches:
refs/heads/master d859378f6 -> adf4f39af
SAMZA-834: fix perf degradation due to frequent update of Timer metrics
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/adf4f39a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/adf4f39a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/adf4f39a
Branch: refs/heads/master
Commit: adf4f39af6666aeee1e8bc7f2c9ba6d208c6fa6e
Parents: d859378
Author: Xinyu Liu <xi...@linkedin.com>
Authored: Tue Dec 8 14:56:54 2015 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Dec 8 14:56:54 2015 -0800
----------------------------------------------------------------------
.../metrics/SlidingTimeWindowReservoir.java | 26 +++++++++++++++-----
.../java/org/apache/samza/metrics/Timer.java | 12 +++++++++
.../metrics/TestSlidingTimeWindowReservoir.java | 15 ++++++++---
.../org/apache/samza/metrics/TestTimer.java | 2 +-
.../apache/samza/container/TestRunLoop.scala | 18 +++++++++++++-
5 files changed, 61 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/adf4f39a/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java b/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java
index df54359..4116473 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java
@@ -31,9 +31,9 @@ import org.apache.samza.util.Clock;
public class SlidingTimeWindowReservoir implements Reservoir {
/**
- * Allow this amount of values to have the same updating time.
+ * default collision buffer
*/
- private static final int TIME_COLLISION_BUFFER = 256;
+ private static final int DEFAULT_TIME_COLLISION_BUFFER = 1;
/**
* Run {@link #removeExpireValues} once every this amount of {@link #update}s
@@ -46,8 +46,13 @@ public class SlidingTimeWindowReservoir implements Reservoir {
private static final int DEFAULT_WINDOW_SIZE_MS = 300000;
/**
+ * Allow this amount of values to have the same updating time.
+ */
+ private final int collisionBuffer;
+
+ /**
* Size of the window. The unit is millisecond. It is as
- * <code>TIME_COLLISION_BUFFER</code> times big as the original window size.
+ * <code>collisionBuffer</code> times big as the original window size.
*/
private final long windowMs;
@@ -93,11 +98,16 @@ public class SlidingTimeWindowReservoir implements Reservoir {
}
public SlidingTimeWindowReservoir(long windowMs, Clock clock) {
- this.windowMs = windowMs * TIME_COLLISION_BUFFER;
+ this(windowMs, DEFAULT_TIME_COLLISION_BUFFER, clock);
+ }
+
+ public SlidingTimeWindowReservoir(long windowMs, int collisionBuffer, Clock clock) {
+ this.windowMs = windowMs * collisionBuffer;
this.storage = new ConcurrentSkipListMap<Long, Long>();
this.count = new AtomicLong();
this.lastUpdatingTime = new AtomicLong();
this.clock = clock;
+ this.collisionBuffer = collisionBuffer;
}
@Override
@@ -126,15 +136,19 @@ public class SlidingTimeWindowReservoir implements Reservoir {
* value's, use the last updating time + 1 as the new updating time. This
* operation guarantees all the updating times in the <code>storage</code>
* strictly increment. No override happens before reaching the
- * <code>TIME_COLLISION_BUFFER</code>.
+ * <code>collisionBuffer</code>.
*
* @return the updating time
*/
private long getUpdatingTime() {
while (true) {
long oldTime = lastUpdatingTime.get();
- long newTime = clock.currentTimeMillis() * TIME_COLLISION_BUFFER;
+ long newTime = clock.currentTimeMillis() * collisionBuffer;
long updatingTime = newTime > oldTime ? newTime : oldTime + 1;
+ // make sure the updateTime doesn't overflow to the next millisecond
+ if (updatingTime == newTime + collisionBuffer) {
+ --updatingTime;
+ }
// make sure no other threads modify the lastUpdatingTime
if (lastUpdatingTime.compareAndSet(oldTime, updatingTime)) {
return updatingTime;
http://git-wip-us.apache.org/repos/asf/samza/blob/adf4f39a/samza-api/src/main/java/org/apache/samza/metrics/Timer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Timer.java b/samza-api/src/main/java/org/apache/samza/metrics/Timer.java
index b49d147..96715e8 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/Timer.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/Timer.java
@@ -52,6 +52,18 @@ public class Timer implements Metric {
}
/**
+ * Construct a {@link Timer} with given window size and collision buffer
+ *
+ * @param name name of this timer
+ * @param windowMs the window size. unit is millisecond
+ * @param collisionBuffer amount of collisions allowed in one millisecond.
+ * @param clock the clock for the reservoir
+ */
+ public Timer(String name, long windowMs, int collisionBuffer, Clock clock) {
+ this(name, new SlidingTimeWindowReservoir(windowMs, collisionBuffer, clock));
+ }
+
+ /**
* Construct a {@link Timer} with given {@link Reservoir}
*
* @param name name of this timer
http://git-wip-us.apache.org/repos/asf/samza/blob/adf4f39a/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java b/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
index d392b32..aca0f9d 100644
--- a/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
+++ b/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
@@ -34,7 +34,7 @@ public class TestSlidingTimeWindowReservoir {
@Test
public void testUpdateSizeSnapshot() {
SlidingTimeWindowReservoir slidingTimeWindowReservoir =
- new SlidingTimeWindowReservoir(300, clock);
+ new SlidingTimeWindowReservoir(300, 8, clock);
when(clock.currentTimeMillis()).thenReturn(0L);
slidingTimeWindowReservoir.update(1L);
@@ -55,20 +55,26 @@ public class TestSlidingTimeWindowReservoir {
@Test
public void testDuplicateTime() {
SlidingTimeWindowReservoir slidingTimeWindowReservoir =
- new SlidingTimeWindowReservoir(300, clock);
- when(clock.currentTimeMillis()).thenReturn(0L);
+ new SlidingTimeWindowReservoir(300, 2, clock);
+ when(clock.currentTimeMillis()).thenReturn(1L);
slidingTimeWindowReservoir.update(1L);
slidingTimeWindowReservoir.update(2L);
Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot();
assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L)));
assertEquals(2, snapshot.getSize());
+
+ // update causes collision, will override the last update
+ slidingTimeWindowReservoir.update(3L);
+ snapshot = slidingTimeWindowReservoir.getSnapshot();
+ assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 3L)));
+ assertEquals(2, snapshot.getSize());
}
@Test
public void testRemoveExpiredValues() {
SlidingTimeWindowReservoir slidingTimeWindowReservoir =
- new SlidingTimeWindowReservoir(300, clock);
+ new SlidingTimeWindowReservoir(300, 8, clock);
when(clock.currentTimeMillis()).thenReturn(0L);
slidingTimeWindowReservoir.update(1L);
@@ -85,4 +91,5 @@ public class TestSlidingTimeWindowReservoir {
assertTrue(snapshot.getValues().containsAll(Arrays.asList(3L, 4L)));
assertEquals(2, snapshot.getSize());
}
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/adf4f39a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
index 63c183f..8076e02 100644
--- a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
+++ b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
@@ -42,7 +42,7 @@ public class TestTimer {
@Test
public void testDefaultTimerUpdateAndGetSnapshot() {
- Timer timer = new Timer("test");
+ Timer timer = new Timer("test", 300, clock);
timer.update(1L);
timer.update(2L);
http://git-wip-us.apache.org/repos/asf/samza/blob/adf4f39a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index b4d6f35..ad37447 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -19,10 +19,14 @@
package org.apache.samza.container
+
+import org.apache.samza.metrics.{Timer, SlidingTimeWindowReservoir, MetricsRegistryMap}
+import org.apache.samza.util.Clock
import org.junit.Test
import org.junit.Assert._
import org.mockito.Matchers
import org.mockito.Mockito._
+import org.mockito.internal.util.reflection.Whitebox
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.junit.AssertionsForJUnit
@@ -183,7 +187,19 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
var now = 0L
val consumers = mock[SystemConsumers]
when(consumers.choose).thenReturn(envelope0)
- val testMetrics = new SamzaContainerMetrics
+ val clock = new Clock {
+ var c = 0L
+ def currentTimeMillis: Long = {
+ c += 1L
+ c
+ }
+ }
+ val testMetrics = new SamzaContainerMetrics("test", new MetricsRegistryMap() {
+ override def newTimer(group: String, name: String) = {
+ newTimer(group, new Timer(name, new SlidingTimeWindowReservoir(300000, clock)))
+ }
+ })
+
val runLoop = new RunLoop(
taskInstances = getMockTaskInstances,
consumerMultiplexer = consumers,