You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2014/08/05 08:48:25 UTC
git commit: SAMZA-349: add Timer in Metrics
Repository: incubator-samza
Updated Branches:
refs/heads/master 38d659b33 -> e603a2794
SAMZA-349: add Timer in Metrics
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/e603a279
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/e603a279
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/e603a279
Branch: refs/heads/master
Commit: e603a27947ed0940167c72ae4fbd8eacd8c7fcbf
Parents: 38d659b
Author: Yan Fang <ya...@gmail.com>
Authored: Mon Aug 4 23:48:02 2014 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Mon Aug 4 23:48:02 2014 -0700
----------------------------------------------------------------------
build.gradle | 1 +
.../apache/samza/metrics/MetricsRegistry.java | 16 ++
.../apache/samza/metrics/MetricsVisitor.java | 4 +
.../ReadableMetricsRegistryListener.java | 2 +
.../org/apache/samza/metrics/Reservoir.java | 46 ++++++
.../metrics/SlidingTimeWindowReservoir.java | 150 +++++++++++++++++++
.../java/org/apache/samza/metrics/Snapshot.java | 96 ++++++++++++
.../java/org/apache/samza/metrics/Timer.java | 98 ++++++++++++
.../apache/samza/util/NoOpMetricsRegistry.java | 13 +-
.../metrics/TestSlidingTimeWindowReservoir.java | 86 +++++++++++
.../org/apache/samza/metrics/TestSnapshot.java | 45 ++++++
.../org/apache/samza/metrics/TestTimer.java | 70 +++++++++
.../samza/util/TestNoOpMetricsRegistry.java | 11 +-
.../apache/samza/metrics/MetricsHelper.scala | 6 +-
.../samza/metrics/MetricsRegistryMap.scala | 13 ++
.../samza/metrics/reporter/JmxReporter.scala | 17 ++-
.../reporter/MetricsSnapshotReporter.scala | 2 +
.../webapp/ApplicationMasterRestServlet.scala | 3 +
18 files changed, 675 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 3ad5fe3..93ec947 100644
--- a/build.gradle
+++ b/build.gradle
@@ -102,6 +102,7 @@ project(':samza-api') {
dependencies {
testCompile "junit:junit:$junitVersion"
+ testCompile "org.mockito:mockito-all:$mockitoVersion"
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
index 1031e45..5a00d01 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
@@ -63,4 +63,20 @@ public interface MetricsRegistry {
* @return Gauge was registered
*/
<T> Gauge<T> newGauge(String group, Gauge<T> value);
+
+ /**
+ * Create and Register a new {@link org.apache.samza.metrics.Timer}
+ * @param group Group for this Timer
+ * @param name Name of to-be-created Timer
+ * @return New Timer instance
+ */
+ Timer newTimer(String group, String name);
+
+ /**
+ * Register existing {@link org.apache.samza.metrics.Timer} with this registry
+ * @param group Group for this Timer
+ * @param timer Existing Timer to register
+ * @return Timer that was registered
+ */
+ Timer newTimer(String group, Timer timer);
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
index f4f756a..75abfe7 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
@@ -29,11 +29,15 @@ public abstract class MetricsVisitor {
public abstract <T> void gauge(Gauge<T> gauge);
+ public abstract void timer(Timer timer);
+
public void visit(Metric metric) {
if (metric instanceof Counter) {
counter((Counter) metric);
} else if (metric instanceof Gauge<?>) {
gauge((Gauge<?>) metric);
+ } else if (metric instanceof Timer) {
+ timer((Timer) metric);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java b/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java
index a16378f..739d68f 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java
@@ -23,4 +23,6 @@ public interface ReadableMetricsRegistryListener {
void onCounter(String group, Counter counter);
void onGauge(String group, Gauge<?> gauge);
+
+ void onTimer(String group, Timer timer);
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/Reservoir.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Reservoir.java b/samza-api/src/main/java/org/apache/samza/metrics/Reservoir.java
new file mode 100644
index 0000000..b45e433
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/metrics/Reservoir.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+/**
+ * A reservoir interface to store, update and display values
+ */
+public interface Reservoir {
+ /**
+ * Return the number of values in this reservoir
+ *
+ * @return the number of values;
+ */
+ int size();
+
+ /**
+ * Update the reservoir with the new value
+ *
+ * @param new value
+ */
+ void update(long value);
+
+ /**
+ * Return a {@link Snapshot} of this reservoir
+ *
+ * @return a statistical snapshot of this reservoir
+ */
+ Snapshot getSnapshot();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java b/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java
new file mode 100644
index 0000000..df54359
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.samza.util.Clock;
+
+/**
+ * An implemented {@link Reservoir} used to store values that appear in a
+ * sliding time window
+ */
+public class SlidingTimeWindowReservoir implements Reservoir {
+
+ /**
+ * Allow this amount of values to have the same updating time.
+ */
+ private static final int TIME_COLLISION_BUFFER = 256;
+
+ /**
+ * Run {@link #removeExpireValues} once every this amount of {@link #update}s
+ */
+ private static final int REMOVE_IN_UPDATE_THRESHOLD = 256;
+
+ /**
+ * default window size
+ */
+ private static final int DEFAULT_WINDOW_SIZE_MS = 300000;
+
+ /**
+ * Size of the window. The unit is millisecond. It is as
+ * <code>TIME_COLLISION_BUFFER</code> times big as the original window size.
+ */
+ private final long windowMs;
+
+ /**
+ * A concurrent map (value's updating time -> value)
+ */
+ private final ConcurrentSkipListMap<Long, Long> storage;
+
+ /**
+ * Total number of values updated in the reservoir.
+ */
+ private final AtomicLong count;
+
+ /**
+ * Updating time of the last value.
+ */
+ private final AtomicLong lastUpdatingTime;
+
+ private final Clock clock;
+
+ /**
+ * Default constructor using default window size
+ */
+ public SlidingTimeWindowReservoir() {
+ this(DEFAULT_WINDOW_SIZE_MS, new Clock() {
+ public long currentTimeMillis() {
+ return System.currentTimeMillis();
+ }
+ });
+ }
+
+ /**
+ * Construct the SlidingTimeWindowReservoir with window size
+ *
+ * @param windowMs the size of the window. unit is millisecond.
+ */
+ public SlidingTimeWindowReservoir(long windowMs) {
+ this(windowMs, new Clock() {
+ public long currentTimeMillis() {
+ return System.currentTimeMillis();
+ }
+ });
+ }
+
+ public SlidingTimeWindowReservoir(long windowMs, Clock clock) {
+ this.windowMs = windowMs * TIME_COLLISION_BUFFER;
+ this.storage = new ConcurrentSkipListMap<Long, Long>();
+ this.count = new AtomicLong();
+ this.lastUpdatingTime = new AtomicLong();
+ this.clock = clock;
+ }
+
+ @Override
+ public int size() {
+ removeExpireValues();
+ return storage.size();
+ }
+
+ @Override
+ public void update(long value) {
+ if (count.incrementAndGet() % REMOVE_IN_UPDATE_THRESHOLD == 0) {
+ removeExpireValues();
+ }
+ storage.put(getUpdatingTime(), value);
+ }
+
+ /**
+ * Remove the values that are earlier than current window
+ */
+ private void removeExpireValues() {
+ storage.headMap(getUpdatingTime() - windowMs).clear();
+ }
+
+ /**
+ * Return the new updating time. If the new value's system time equals to last
+ * value's, use the last updating time + 1 as the new updating time. This
+ * operation guarantees all the updating times in the <code>storage</code>
+ * strictly increment. No override happens before reaching the
+ * <code>TIME_COLLISION_BUFFER</code>.
+ *
+ * @return the updating time
+ */
+ private long getUpdatingTime() {
+ while (true) {
+ long oldTime = lastUpdatingTime.get();
+ long newTime = clock.currentTimeMillis() * TIME_COLLISION_BUFFER;
+ long updatingTime = newTime > oldTime ? newTime : oldTime + 1;
+ // make sure no other threads modify the lastUpdatingTime
+ if (lastUpdatingTime.compareAndSet(oldTime, updatingTime)) {
+ return updatingTime;
+ }
+ }
+ }
+
+ @Override
+ public Snapshot getSnapshot() {
+ removeExpireValues();
+ return new Snapshot(storage.values());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java b/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java
new file mode 100644
index 0000000..7666909
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A statistical snapshot of a collection of values
+ */
+public class Snapshot {
+ private final ArrayList<Long> values;
+ private final int size;
+
+ public Snapshot(Collection<Long> values) {
+ this.values = new ArrayList<Long>(values);
+ this.size = values.size();
+ Collections.sort(this.values);
+ }
+
+ /**
+ * Get the maximum value in the collection
+ *
+ * @return maximum value
+ */
+ public long getMax() {
+ if (size == 0) {
+ return 0;
+ }
+ return values.get(size - 1);
+ }
+
+ /**
+ * Get the minimum value in the collection
+ *
+ * @return minimum value
+ */
+ public long getMin() {
+ if (size == 0) {
+ return 0;
+ }
+ return values.get(0);
+ }
+
+ /**
+ * Get the average of the values in the collection
+ *
+ * @return average value
+ */
+ public double getAverage() {
+ if (size == 0) {
+ return 0;
+ }
+ double sum = 0;
+ for (long value : values) {
+ sum += value;
+ }
+ return sum / size;
+ }
+
+ /**
+ * Get the number of values in the collection
+ *
+ * @return size of the collection
+ */
+ public int getSize() {
+ return size;
+ }
+
+ /**
+ * Return the entire list of values
+ *
+ * @return the list of values
+ */
+ public ArrayList<Long> getValues() {
+ return (ArrayList<Long>) values.clone();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/Timer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Timer.java b/samza-api/src/main/java/org/apache/samza/metrics/Timer.java
new file mode 100644
index 0000000..b49d147
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/metrics/Timer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import org.apache.samza.util.Clock;
+
+/**
+ * A timer metric that stores time duration and provides {@link Snapshot} of the
+ * durations.
+ */
+public class Timer implements Metric {
+
+ private final String name;
+ private final Reservoir reservoir;
+
+ /**
+ * Default constructor. It uses {@link SlidingTimeWindowReservoir} as the
+ * default reservoir.
+ *
+ * @param name name of this timer
+ */
+ public Timer(String name) {
+ this(name, new SlidingTimeWindowReservoir());
+ }
+
+ /**
+ * Construct a {@link Timer} with given window size
+ *
+ * @param name name of this timer
+ * @param windowMs the window size. unit is millisecond
+ * @param clock the clock for the reservoir
+ */
+ public Timer(String name, long windowMs, Clock clock) {
+ this(name, new SlidingTimeWindowReservoir(windowMs, clock));
+ }
+
+ /**
+ * Construct a {@link Timer} with given {@link Reservoir}
+ *
+ * @param name name of this timer
+ * @param reservoir the given reservoir
+ */
+ public Timer(String name, Reservoir reservoir) {
+ this.name = name;
+ this.reservoir = reservoir;
+ }
+
+ /**
+ * Add the time duration
+ *
+ * @param duration time duration
+ */
+ public void update(long duration) {
+ if (duration > 0) {
+ reservoir.update(duration);
+ }
+ }
+
+ /**
+ * Get the {@link Snapshot}
+ *
+ * @return a statistical snapshot
+ */
+ public Snapshot getSnapshot() {
+ return reservoir.getSnapshot();
+ }
+
+ @Override
+ public void visit(MetricsVisitor visitor) {
+ visitor.timer(this);
+ }
+
+ /**
+ * Get the name of the timer
+ *
+ * @return name of the timer
+ */
+ public String getName() {
+ return name;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
index d7bc4a9..3df855c 100644
--- a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
+++ b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
@@ -22,6 +22,7 @@ package org.apache.samza.util;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
/**
* {@link org.apache.samza.metrics.MetricsRegistry} implementation for when no actual metrics need to be
@@ -47,4 +48,14 @@ public class NoOpMetricsRegistry implements MetricsRegistry {
public <T> Gauge<T> newGauge(String group, Gauge<T> gauge) {
return gauge;
}
-}
+
+ @Override
+ public Timer newTimer(String group, String name) {
+ return new Timer(name);
+ }
+
+ @Override
+ public Timer newTimer(String group, Timer timer) {
+ return timer;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java b/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
new file mode 100644
index 0000000..eb5043b
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+
+import org.apache.samza.util.Clock;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+public class TestSlidingTimeWindowReservoir {
+
+ private final Clock clock = mock(Clock.class);
+
+ @Test
+ public void testUpdateSizeSnapshot() {
+ SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock);
+
+ when(clock.currentTimeMillis()).thenReturn(0L);
+ slidingTimeWindowReservoir.update(1L);
+
+ when(clock.currentTimeMillis()).thenReturn(1L);
+ slidingTimeWindowReservoir.update(2L);
+
+ when(clock.currentTimeMillis()).thenReturn(2L);
+ slidingTimeWindowReservoir.update(3L);
+
+ assertEquals(3, slidingTimeWindowReservoir.size());
+
+ Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot();
+ assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L, 3L)));
+ assertTrue(snapshot.getSize() == 3);
+ }
+
+ @Test
+ public void testDuplicateTime() {
+ SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock);
+ when(clock.currentTimeMillis()).thenReturn(0L);
+ slidingTimeWindowReservoir.update(1L);
+ slidingTimeWindowReservoir.update(2L);
+
+ Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot();
+ assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L)));
+ assertTrue(snapshot.getSize() == 2);
+ }
+
+ @Test
+ public void testRemoveExpiredValues() {
+ SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock);
+ when(clock.currentTimeMillis()).thenReturn(0L);
+ slidingTimeWindowReservoir.update(1L);
+
+ when(clock.currentTimeMillis()).thenReturn(100L);
+ slidingTimeWindowReservoir.update(2L);
+
+ when(clock.currentTimeMillis()).thenReturn(301L);
+ slidingTimeWindowReservoir.update(3L);
+
+ when(clock.currentTimeMillis()).thenReturn(500L);
+ slidingTimeWindowReservoir.update(4L);
+
+ Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot();
+ assertTrue(snapshot.getValues().containsAll(Arrays.asList(3L, 4L)));
+ assertTrue(snapshot.getSize() == 2);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/test/java/org/apache/samza/metrics/TestSnapshot.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestSnapshot.java b/samza-api/src/test/java/org/apache/samza/metrics/TestSnapshot.java
new file mode 100644
index 0000000..b7aecb2
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/metrics/TestSnapshot.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+public class TestSnapshot {
+
+ @Test
+ public void testGetMaxMinAverageSize() {
+ Snapshot snapshot = new Snapshot(Arrays.asList(1L, 2L, 3L, 4L, 5L));
+ assertEquals(5, snapshot.getMax());
+ assertEquals(1, snapshot.getMin());
+ assertEquals(3, snapshot.getAverage(), 0);
+ assertEquals(5, snapshot.getSize());
+
+ Snapshot emptySnapshot = new Snapshot(new ArrayList<Long>());
+ assertEquals(0, emptySnapshot.getMax());
+ assertEquals(0, emptySnapshot.getMin());
+ assertEquals(0, emptySnapshot.getAverage(), 0);
+ assertEquals(0, emptySnapshot.getSize());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
new file mode 100644
index 0000000..dcc3cb8
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+
+import org.apache.samza.util.Clock;
+import org.junit.Test;
+
+public class TestTimer {
+
+ // mock clock
+ private final Clock clock = new Clock() {
+ long value = 0;
+
+ @Override
+ public long currentTimeMillis() {
+ return value += 100;
+ }
+ };
+
+ @Test
+ public void testDefaultTimerUpdateAndGetSnapshot() {
+ Timer timer = new Timer("test");
+ timer.update(1L);
+ timer.update(2L);
+
+ Snapshot snapshot = timer.getSnapshot();
+ assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L)));
+ assertTrue(snapshot.getValues().size() == 2);
+ }
+
+ @Test
+ public void testTimerWithDifferentWindowSize() {
+ Timer timer = new Timer("test", 300, clock);
+ timer.update(1L);
+ timer.update(2L);
+ timer.update(3L);
+
+ Snapshot snapshot = timer.getSnapshot();
+ assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L, 3L)));
+ assertTrue(snapshot.getValues().size() == 3);
+
+ // the time is 500 for update(4L) because getSnapshot calls clock once + 3
+ // updates that call clock 3 times
+ timer.update(4L);
+ Snapshot snapshot2 = timer.getSnapshot();
+ assertTrue(snapshot2.getValues().containsAll(Arrays.asList(3L, 4L)));
+ assertTrue(snapshot2.getValues().size() == 2);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java b/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java
index 78d2824..2d0034f 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java
@@ -22,9 +22,9 @@ package org.apache.samza.util;
import static org.junit.Assert.*;
import org.junit.Test;
-
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.Timer;
public class TestNoOpMetricsRegistry {
@Test
@@ -37,6 +37,9 @@ public class TestNoOpMetricsRegistry {
Gauge<String> gauge2 = registry.newGauge("testg", "b", "2");
Gauge<String> gauge3 = registry.newGauge("testg", "c", "3");
Gauge<String> gauge4 = registry.newGauge("testg2", "d", "4");
+ Timer timer1 = registry.newTimer("testt", "a");
+ Timer timer2 = registry.newTimer("testt", "b");
+ Timer timer3 = registry.newTimer("testt2", "c");
counter1.inc();
counter2.inc(2);
counter3.inc(4);
@@ -44,6 +47,9 @@ public class TestNoOpMetricsRegistry {
gauge2.set("6");
gauge3.set("7");
gauge4.set("8");
+ timer1.update(1L);
+ timer2.update(2L);
+ timer3.update(3L);
assertEquals(counter1.getCount(), 1);
assertEquals(counter2.getCount(), 2);
assertEquals(counter3.getCount(), 4);
@@ -51,5 +57,8 @@ public class TestNoOpMetricsRegistry {
assertEquals(gauge2.getValue(), "6");
assertEquals(gauge3.getValue(), "7");
assertEquals(gauge4.getValue(), "8");
+ assertEquals(timer1.getSnapshot().getAverage(), 1, 0);
+ assertEquals(timer2.getSnapshot().getAverage(), 2, 0);
+ assertEquals(timer3.getSnapshot().getAverage(), 3, 0);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
index 8043f37..e5d6b1e 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
@@ -23,7 +23,7 @@ import org.apache.samza.container.SamzaContainerMetrics
/**
* MetricsHelper is a little helper class to make it easy to register and
- * manage counters and gauges.
+ * manage counters, gauges and timers.
*/
trait MetricsHelper {
val group = this.getClass.getName
@@ -48,6 +48,10 @@ trait MetricsHelper {
})
}
+ def newTimer(name: String) = {
+ registry.newTimer(group, (getPrefix + name).toLowerCase)
+ }
+
/**
* Returns a prefix for metric names.
*/
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
index da83f20..aac241b 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
@@ -62,6 +62,19 @@ class MetricsRegistryMap(val name: String) extends ReadableMetricsRegistry with
newGauge(group, new Gauge[T](name, value))
}
+ def newTimer(group: String, timer: Timer) = {
+ debug("Add new timer %s %s %s." format (group, timer.getName, timer))
+ putAndGetGroup(group).putIfAbsent(timer.getName, timer)
+ val realTimer = metrics.get(group).get(timer.getName).asInstanceOf[Timer]
+ listeners.foreach(_.onTimer(group, realTimer))
+ realTimer
+ }
+
+ def newTimer(group: String, name: String) = {
+ debug("Creating new timer %s %s." format (group, name))
+ newTimer(group, new Timer(name))
+ }
+
private def putAndGetGroup(group: String) = {
metrics.putIfAbsent(group, new ConcurrentHashMap[String, Metric])
metrics.get(group)
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
index 8814e68..d66efc2 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
@@ -26,6 +26,7 @@ import javax.management.ObjectName
import org.apache.samza.config.Config
import org.apache.samza.metrics.Counter
import org.apache.samza.metrics.Gauge
+import org.apache.samza.metrics.Timer
import org.apache.samza.metrics.MetricsReporter
import org.apache.samza.metrics.MetricsReporterFactory
import org.apache.samza.metrics.ReadableMetricsRegistry
@@ -47,8 +48,9 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
registry.getGroup(group).foreach {
case (name, metric) =>
metric.visit(new MetricsVisitor {
- def counter(counter: Counter) = registerBean(new JmxCounter(counter, getObjectName(group, name, sources(registry))));
+ def counter(counter: Counter) = registerBean(new JmxCounter(counter, getObjectName(group, name, sources(registry))))
def gauge[T](gauge: Gauge[T]) = registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, name, sources(registry))))
+ def timer(timer: Timer) = registerBean(new JmxTimer(timer, getObjectName(group, name, sources(registry))))
})
}
})
@@ -66,6 +68,10 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
def onGauge(group: String, gauge: Gauge[_]) {
registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, gauge.getName, source)))
}
+
+ def onTimer(group: String, timer: Timer) {
+ registerBean(new JmxTimer(timer, getObjectName(group, timer.getName, source)))
+ }
}
} else {
warn("Trying to re-register a registry for source %s. Ignoring." format source)
@@ -137,6 +143,15 @@ class JmxCounter(c: org.apache.samza.metrics.Counter, on: ObjectName) extends Jm
def objectName = on
}
+trait JmxTimerMBean extends MetricMBean {
+ def getAverageTime(): Double
+}
+
+class JmxTimer(t: org.apache.samza.metrics.Timer, on: ObjectName) extends JmxTimerMBean {
+ def getAverageTime() = t.getSnapshot().getAverage()
+ def objectName = on
+}
+
class JmxReporterFactory extends MetricsReporterFactory with Logging {
def getMetricsReporter(name: String, containerName: String, config: Config) = {
info("Creating JMX reporter with name %s." format name)
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
index 9a56754..319c74d 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
@@ -25,6 +25,7 @@ import scala.collection.JavaConversions._
import grizzled.slf4j.Logging
import org.apache.samza.metrics.Counter
import org.apache.samza.metrics.Gauge
+import org.apache.samza.metrics.Timer
import org.apache.samza.metrics.MetricsReporter
import org.apache.samza.metrics.MetricsVisitor
import org.apache.samza.metrics.ReadableMetricsRegistry
@@ -122,6 +123,7 @@ class MetricsSnapshotReporter(
metric.visit(new MetricsVisitor {
def counter(counter: Counter) = groupMsg.put(name, counter.getCount: java.lang.Long)
def gauge[T](gauge: Gauge[T]) = groupMsg.put(name, gauge.getValue.asInstanceOf[Object])
+ def timer(timer: Timer) = groupMsg.put(name, timer.getSnapshot().getAverage(): java.lang.Double)
})
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
index d10dc38..27fbe2d 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
@@ -54,6 +54,9 @@ class ApplicationMasterRestServlet(config: Config, state: SamzaAppMasterState, r
def gauge[T](gauge: Gauge[T]) =
groupMap.put(gauge.getName, gauge.getValue.asInstanceOf[java.lang.Object])
+
+ def timer(timer: Timer) =
+ groupMap.put(timer.getName, timer.getSnapshot().getAverage: java.lang.Double)
})
}