You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2014/08/05 08:48:25 UTC

git commit: SAMZA-349: add Timer in Metrics

Repository: incubator-samza
Updated Branches:
  refs/heads/master 38d659b33 -> e603a2794


SAMZA-349: add Timer in Metrics


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/e603a279
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/e603a279
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/e603a279

Branch: refs/heads/master
Commit: e603a27947ed0940167c72ae4fbd8eacd8c7fcbf
Parents: 38d659b
Author: Yan Fang <ya...@gmail.com>
Authored: Mon Aug 4 23:48:02 2014 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Mon Aug 4 23:48:02 2014 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 .../apache/samza/metrics/MetricsRegistry.java   |  16 ++
 .../apache/samza/metrics/MetricsVisitor.java    |   4 +
 .../ReadableMetricsRegistryListener.java        |   2 +
 .../org/apache/samza/metrics/Reservoir.java     |  46 ++++++
 .../metrics/SlidingTimeWindowReservoir.java     | 150 +++++++++++++++++++
 .../java/org/apache/samza/metrics/Snapshot.java |  96 ++++++++++++
 .../java/org/apache/samza/metrics/Timer.java    |  98 ++++++++++++
 .../apache/samza/util/NoOpMetricsRegistry.java  |  13 +-
 .../metrics/TestSlidingTimeWindowReservoir.java |  86 +++++++++++
 .../org/apache/samza/metrics/TestSnapshot.java  |  45 ++++++
 .../org/apache/samza/metrics/TestTimer.java     |  70 +++++++++
 .../samza/util/TestNoOpMetricsRegistry.java     |  11 +-
 .../apache/samza/metrics/MetricsHelper.scala    |   6 +-
 .../samza/metrics/MetricsRegistryMap.scala      |  13 ++
 .../samza/metrics/reporter/JmxReporter.scala    |  17 ++-
 .../reporter/MetricsSnapshotReporter.scala      |   2 +
 .../webapp/ApplicationMasterRestServlet.scala   |   3 +
 18 files changed, 675 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 3ad5fe3..93ec947 100644
--- a/build.gradle
+++ b/build.gradle
@@ -102,6 +102,7 @@ project(':samza-api') {
 
   dependencies {
     testCompile "junit:junit:$junitVersion"
+    testCompile "org.mockito:mockito-all:$mockitoVersion"
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
index 1031e45..5a00d01 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
@@ -63,4 +63,20 @@ public interface MetricsRegistry {
    * @return Gauge was registered
    */
   <T> Gauge<T> newGauge(String group, Gauge<T> value);
+
+  /**
+   * Create and Register a new {@link org.apache.samza.metrics.Timer}
+   * @param group Group for this Timer
+   * @param name Name of to-be-created Timer
+   * @return New Timer instance
+   */
+  Timer newTimer(String group, String name);
+
+  /**
+   * Register existing {@link org.apache.samza.metrics.Timer} with this registry
+   * @param group Group for this Timer
+   * @param timer Existing Timer to register
+   * @return Timer that was registered
+   */
+  Timer newTimer(String group, Timer timer);
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
index f4f756a..75abfe7 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
@@ -29,11 +29,15 @@ public abstract class MetricsVisitor {
 
   public abstract <T> void gauge(Gauge<T> gauge);
 
+  public abstract void timer(Timer timer);
+
   public void visit(Metric metric) {
     if (metric instanceof Counter) {
       counter((Counter) metric);
     } else if (metric instanceof Gauge<?>) {
       gauge((Gauge<?>) metric);
+    } else if (metric instanceof Timer) {
+      timer((Timer) metric);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java b/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java
index a16378f..739d68f 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java
@@ -23,4 +23,6 @@ public interface ReadableMetricsRegistryListener {
   void onCounter(String group, Counter counter);
 
   void onGauge(String group, Gauge<?> gauge);
+
+  void onTimer(String group, Timer timer);
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/Reservoir.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Reservoir.java b/samza-api/src/main/java/org/apache/samza/metrics/Reservoir.java
new file mode 100644
index 0000000..b45e433
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/metrics/Reservoir.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+/**
+ * A reservoir interface to store, update and display values
+ */
+public interface Reservoir {
+  /**
+   * Return the number of values in this reservoir
+   *
+   * @return the number of values;
+   */
+  int size();
+
+  /**
+   * Update the reservoir with the new value
+   *
+   * @param new value
+   */
+  void update(long value);
+
+  /**
+   * Return a {@link Snapshot} of this reservoir
+   *
+   * @return a statistical snapshot of this reservoir
+   */
+  Snapshot getSnapshot();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java b/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java
new file mode 100644
index 0000000..df54359
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.samza.util.Clock;
+
+/**
+ * An implemented {@link Reservoir} used to store values that appear in a
+ * sliding time window
+ */
+public class SlidingTimeWindowReservoir implements Reservoir {
+
+  /**
+   * Allow this amount of values to have the same updating time.
+   */
+  private static final int TIME_COLLISION_BUFFER = 256;
+
+  /**
+   * Run {@link #removeExpireValues} once every this amount of {@link #update}s
+   */
+  private static final int REMOVE_IN_UPDATE_THRESHOLD = 256;
+
+  /**
+   * default window size
+   */
+  private static final int DEFAULT_WINDOW_SIZE_MS = 300000;
+
+  /**
+   * Size of the window. The unit is millisecond. It is as
+   * <code>TIME_COLLISION_BUFFER</code> times big as the original window size.
+   */
+  private final long windowMs;
+
+  /**
+   * A concurrent map (value's updating time -> value)
+   */
+  private final ConcurrentSkipListMap<Long, Long> storage;
+
+  /**
+   * Total number of values updated in the reservoir.
+   */
+  private final AtomicLong count;
+
+  /**
+   * Updating time of the last value.
+   */
+  private final AtomicLong lastUpdatingTime;
+
+  private final Clock clock;
+
+  /**
+   * Default constructor using default window size
+   */
+  public SlidingTimeWindowReservoir() {
+    this(DEFAULT_WINDOW_SIZE_MS, new Clock() {
+      public long currentTimeMillis() {
+        return System.currentTimeMillis();
+      }
+    });
+  }
+
+  /**
+   * Construct the SlidingTimeWindowReservoir with window size
+   *
+   * @param windowMs the size of the window. unit is millisecond.
+   */
+  public SlidingTimeWindowReservoir(long windowMs) {
+    this(windowMs, new Clock() {
+      public long currentTimeMillis() {
+        return System.currentTimeMillis();
+      }
+    });
+  }
+
+  public SlidingTimeWindowReservoir(long windowMs, Clock clock) {
+    this.windowMs = windowMs * TIME_COLLISION_BUFFER;
+    this.storage = new ConcurrentSkipListMap<Long, Long>();
+    this.count = new AtomicLong();
+    this.lastUpdatingTime = new AtomicLong();
+    this.clock = clock;
+  }
+
+  @Override
+  public int size() {
+    removeExpireValues();
+    return storage.size();
+  }
+
+  @Override
+  public void update(long value) {
+    if (count.incrementAndGet() % REMOVE_IN_UPDATE_THRESHOLD == 0) {
+      removeExpireValues();
+    }
+    storage.put(getUpdatingTime(), value);
+  }
+
+  /**
+   * Remove the values that are earlier than current window
+   */
+  private void removeExpireValues() {
+    storage.headMap(getUpdatingTime() - windowMs).clear();
+  }
+
+  /**
+   * Return the new updating time. If the new value's system time equals to last
+   * value's, use the last updating time + 1 as the new updating time. This
+   * operation guarantees all the updating times in the <code>storage</code>
+   * strictly increment. No override happens before reaching the
+   * <code>TIME_COLLISION_BUFFER</code>.
+   *
+   * @return the updating time
+   */
+  private long getUpdatingTime() {
+    while (true) {
+      long oldTime = lastUpdatingTime.get();
+      long newTime = clock.currentTimeMillis() * TIME_COLLISION_BUFFER;
+      long updatingTime = newTime > oldTime ? newTime : oldTime + 1;
+      // make sure no other threads modify the lastUpdatingTime
+      if (lastUpdatingTime.compareAndSet(oldTime, updatingTime)) {
+        return updatingTime;
+      }
+    }
+  }
+
+  @Override
+  public Snapshot getSnapshot() {
+    removeExpireValues();
+    return new Snapshot(storage.values());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java b/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java
new file mode 100644
index 0000000..7666909
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A statistical snapshot of a collection of values
+ */
+public class Snapshot {
+  private final ArrayList<Long> values;
+  private final int size;
+
+  public Snapshot(Collection<Long> values) {
+    this.values = new ArrayList<Long>(values);
+    this.size = values.size();
+    Collections.sort(this.values);
+  }
+
+  /**
+   * Get the maximum value in the collection
+   *
+   * @return maximum value
+   */
+  public long getMax() {
+    if (size == 0) {
+      return 0;
+    }
+    return values.get(size - 1);
+  }
+
+  /**
+   * Get the minimum value in the collection
+   *
+   * @return minimum value
+   */
+  public long getMin() {
+    if (size == 0) {
+      return 0;
+    }
+    return values.get(0);
+  }
+
+  /**
+   * Get the average of the values in the collection
+   *
+   * @return average value
+   */
+  public double getAverage() {
+    if (size == 0) {
+      return 0;
+    }
+    double sum = 0;
+    for (long value : values) {
+      sum += value;
+    }
+    return sum / size;
+  }
+
+  /**
+   * Get the number of values in the collection
+   *
+   * @return size of the collection
+   */
+  public int getSize() {
+    return size;
+  }
+
+  /**
+   * Return the entire list of values
+   *
+   * @return the list of values
+   */
+  public ArrayList<Long> getValues() {
+    return (ArrayList<Long>) values.clone();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/metrics/Timer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Timer.java b/samza-api/src/main/java/org/apache/samza/metrics/Timer.java
new file mode 100644
index 0000000..b49d147
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/metrics/Timer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import org.apache.samza.util.Clock;
+
+/**
+ * A timer metric that stores time duration and provides {@link Snapshot} of the
+ * durations.
+ */
+public class Timer implements Metric {
+
+  private final String name;
+  private final Reservoir reservoir;
+
+  /**
+   * Default constructor. It uses {@link SlidingTimeWindowReservoir} as the
+   * default reservoir.
+   *
+   * @param name name of this timer
+   */
+  public Timer(String name) {
+    this(name, new SlidingTimeWindowReservoir());
+  }
+
+  /**
+   * Construct a {@link Timer} with given window size
+   *
+   * @param name name of this timer
+   * @param windowMs the window size. unit is millisecond
+   * @param clock the clock for the reservoir
+   */
+  public Timer(String name, long windowMs, Clock clock) {
+    this(name, new SlidingTimeWindowReservoir(windowMs, clock));
+  }
+
+  /**
+   * Construct a {@link Timer} with given {@link Reservoir}
+   *
+   * @param name name of this timer
+   * @param reservoir the given reservoir
+   */
+  public Timer(String name, Reservoir reservoir) {
+    this.name = name;
+    this.reservoir = reservoir;
+  }
+
+  /**
+   * Add the time duration
+   *
+   * @param duration time duration
+   */
+  public void update(long duration) {
+    if (duration > 0) {
+      reservoir.update(duration);
+    }
+  }
+
+  /**
+   * Get the {@link Snapshot}
+   *
+   * @return a statistical snapshot
+   */
+  public Snapshot getSnapshot() {
+    return reservoir.getSnapshot();
+  }
+
+  @Override
+  public void visit(MetricsVisitor visitor) {
+    visitor.timer(this);
+  }
+
+  /**
+   * Get the name of the timer
+   *
+   * @return name of the timer
+   */
+  public String getName() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
index d7bc4a9..3df855c 100644
--- a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
+++ b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
@@ -22,6 +22,7 @@ package org.apache.samza.util;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
 import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
 
 /**
  * {@link org.apache.samza.metrics.MetricsRegistry} implementation for when no actual metrics need to be
@@ -47,4 +48,14 @@ public class NoOpMetricsRegistry implements MetricsRegistry {
   public <T> Gauge<T> newGauge(String group, Gauge<T> gauge) {
     return gauge;
   }
-}
+
+  @Override
+  public Timer newTimer(String group, String name) {
+    return new Timer(name);
+  }
+
+  @Override
+  public Timer newTimer(String group, Timer timer) {
+    return timer;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java b/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
new file mode 100644
index 0000000..eb5043b
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+
+import org.apache.samza.util.Clock;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+public class TestSlidingTimeWindowReservoir {
+
+  private final Clock clock = mock(Clock.class);
+
+  @Test
+  public void testUpdateSizeSnapshot() {
+    SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock);
+
+    when(clock.currentTimeMillis()).thenReturn(0L);
+    slidingTimeWindowReservoir.update(1L);
+
+    when(clock.currentTimeMillis()).thenReturn(1L);
+    slidingTimeWindowReservoir.update(2L);
+
+    when(clock.currentTimeMillis()).thenReturn(2L);
+    slidingTimeWindowReservoir.update(3L);
+
+    assertEquals(3, slidingTimeWindowReservoir.size());
+
+    Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot();
+    assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L, 3L)));
+    assertTrue(snapshot.getSize() == 3);
+  }
+
+  @Test
+  public void testDuplicateTime() {
+    SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock);
+    when(clock.currentTimeMillis()).thenReturn(0L);
+    slidingTimeWindowReservoir.update(1L);
+    slidingTimeWindowReservoir.update(2L);
+
+    Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot();
+    assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L)));
+    assertTrue(snapshot.getSize() == 2);
+  }
+
+  @Test
+  public void testRemoveExpiredValues() {
+    SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock);
+    when(clock.currentTimeMillis()).thenReturn(0L);
+    slidingTimeWindowReservoir.update(1L);
+
+    when(clock.currentTimeMillis()).thenReturn(100L);
+    slidingTimeWindowReservoir.update(2L);
+
+    when(clock.currentTimeMillis()).thenReturn(301L);
+    slidingTimeWindowReservoir.update(3L);
+
+    when(clock.currentTimeMillis()).thenReturn(500L);
+    slidingTimeWindowReservoir.update(4L);
+
+    Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot();
+    assertTrue(snapshot.getValues().containsAll(Arrays.asList(3L, 4L)));
+    assertTrue(snapshot.getSize() == 2);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/test/java/org/apache/samza/metrics/TestSnapshot.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestSnapshot.java b/samza-api/src/test/java/org/apache/samza/metrics/TestSnapshot.java
new file mode 100644
index 0000000..b7aecb2
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/metrics/TestSnapshot.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+public class TestSnapshot {
+
+  @Test
+  public void testGetMaxMinAverageSize() {
+    Snapshot snapshot = new Snapshot(Arrays.asList(1L, 2L, 3L, 4L, 5L));
+    assertEquals(5, snapshot.getMax());
+    assertEquals(1, snapshot.getMin());
+    assertEquals(3, snapshot.getAverage(), 0);
+    assertEquals(5, snapshot.getSize());
+
+    Snapshot emptySnapshot = new Snapshot(new ArrayList<Long>());
+    assertEquals(0, emptySnapshot.getMax());
+    assertEquals(0, emptySnapshot.getMin());
+    assertEquals(0, emptySnapshot.getAverage(), 0);
+    assertEquals(0, emptySnapshot.getSize());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
new file mode 100644
index 0000000..dcc3cb8
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+
+import org.apache.samza.util.Clock;
+import org.junit.Test;
+
+public class TestTimer {
+
+  // mock clock
+  private final Clock clock = new Clock() {
+    long value = 0;
+
+    @Override
+    public long currentTimeMillis() {
+      return value += 100;
+    }
+  };
+
+  @Test
+  public void testDefaultTimerUpdateAndGetSnapshot() {
+    Timer timer = new Timer("test");
+    timer.update(1L);
+    timer.update(2L);
+
+    Snapshot snapshot = timer.getSnapshot();
+    assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L)));
+    assertTrue(snapshot.getValues().size() == 2);
+  }
+
+  @Test
+  public void testTimerWithDifferentWindowSize() {
+    Timer timer = new Timer("test", 300, clock);
+    timer.update(1L);
+    timer.update(2L);
+    timer.update(3L);
+
+    Snapshot snapshot = timer.getSnapshot();
+    assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L, 3L)));
+    assertTrue(snapshot.getValues().size() == 3);
+
+    // the time is 500 for update(4L) because getSnapshot calls clock once + 3
+    // updates that call clock 3 times
+    timer.update(4L);
+    Snapshot snapshot2 = timer.getSnapshot();
+    assertTrue(snapshot2.getValues().containsAll(Arrays.asList(3L, 4L)));
+    assertTrue(snapshot2.getValues().size() == 2);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java b/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java
index 78d2824..2d0034f 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java
@@ -22,9 +22,9 @@ package org.apache.samza.util;
 import static org.junit.Assert.*;
 
 import org.junit.Test;
-
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.Timer;
 
 public class TestNoOpMetricsRegistry {
   @Test
@@ -37,6 +37,9 @@ public class TestNoOpMetricsRegistry {
     Gauge<String> gauge2 = registry.newGauge("testg", "b", "2");
     Gauge<String> gauge3 = registry.newGauge("testg", "c", "3");
     Gauge<String> gauge4 = registry.newGauge("testg2", "d", "4");
+    Timer timer1 = registry.newTimer("testt", "a");
+    Timer timer2 = registry.newTimer("testt", "b");
+    Timer timer3 = registry.newTimer("testt2", "c");
     counter1.inc();
     counter2.inc(2);
     counter3.inc(4);
@@ -44,6 +47,9 @@ public class TestNoOpMetricsRegistry {
     gauge2.set("6");
     gauge3.set("7");
     gauge4.set("8");
+    timer1.update(1L);
+    timer2.update(2L);
+    timer3.update(3L);
     assertEquals(counter1.getCount(), 1);
     assertEquals(counter2.getCount(), 2);
     assertEquals(counter3.getCount(), 4);
@@ -51,5 +57,8 @@ public class TestNoOpMetricsRegistry {
     assertEquals(gauge2.getValue(), "6");
     assertEquals(gauge3.getValue(), "7");
     assertEquals(gauge4.getValue(), "8");
+    assertEquals(timer1.getSnapshot().getAverage(), 1, 0);
+    assertEquals(timer2.getSnapshot().getAverage(), 2, 0);
+    assertEquals(timer3.getSnapshot().getAverage(), 3, 0);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
index 8043f37..e5d6b1e 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
@@ -23,7 +23,7 @@ import org.apache.samza.container.SamzaContainerMetrics
 
 /**
  * MetricsHelper is a little helper class to make it easy to register and
- * manage counters and gauges.
+ * manage counters, gauges and timers.
  */
 trait MetricsHelper {
   val group = this.getClass.getName
@@ -48,6 +48,10 @@ trait MetricsHelper {
     })
   }
 
+  def newTimer(name: String) = {
+    registry.newTimer(group, (getPrefix + name).toLowerCase)
+  }
+
   /**
    * Returns a prefix for metric names.
    */

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
index da83f20..aac241b 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
@@ -62,6 +62,19 @@ class MetricsRegistryMap(val name: String) extends ReadableMetricsRegistry with
     newGauge(group, new Gauge[T](name, value))
   }
 
+  def newTimer(group: String, timer: Timer) = {
+    debug("Add new timer %s %s %s." format (group, timer.getName, timer))
+    putAndGetGroup(group).putIfAbsent(timer.getName, timer)
+    val realTimer = metrics.get(group).get(timer.getName).asInstanceOf[Timer]
+    listeners.foreach(_.onTimer(group, realTimer))
+    realTimer
+  }
+
+  def newTimer(group: String, name: String) = {
+    debug("Creating new timer %s %s." format (group, name))
+    newTimer(group, new Timer(name))
+  }
+
   private def putAndGetGroup(group: String) = {
     metrics.putIfAbsent(group, new ConcurrentHashMap[String, Metric])
     metrics.get(group)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
index 8814e68..d66efc2 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
@@ -26,6 +26,7 @@ import javax.management.ObjectName
 import org.apache.samza.config.Config
 import org.apache.samza.metrics.Counter
 import org.apache.samza.metrics.Gauge
+import org.apache.samza.metrics.Timer
 import org.apache.samza.metrics.MetricsReporter
 import org.apache.samza.metrics.MetricsReporterFactory
 import org.apache.samza.metrics.ReadableMetricsRegistry
@@ -47,8 +48,9 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
         registry.getGroup(group).foreach {
           case (name, metric) =>
             metric.visit(new MetricsVisitor {
-              def counter(counter: Counter) = registerBean(new JmxCounter(counter, getObjectName(group, name, sources(registry))));
+              def counter(counter: Counter) = registerBean(new JmxCounter(counter, getObjectName(group, name, sources(registry))))
               def gauge[T](gauge: Gauge[T]) = registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, name, sources(registry))))
+              def timer(timer: Timer) = registerBean(new JmxTimer(timer, getObjectName(group, name, sources(registry))))
             })
         }
       })
@@ -66,6 +68,10 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
         def onGauge(group: String, gauge: Gauge[_]) {
           registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, gauge.getName, source)))
         }
+
+        def onTimer(group: String, timer: Timer) {
+          registerBean(new JmxTimer(timer, getObjectName(group, timer.getName, source)))
+        }
       }
     } else {
       warn("Trying to re-register a registry for source %s. Ignoring." format source)
@@ -137,6 +143,15 @@ class JmxCounter(c: org.apache.samza.metrics.Counter, on: ObjectName) extends Jm
   def objectName = on
 }
 
+trait JmxTimerMBean extends MetricMBean {
+  def getAverageTime(): Double
+}
+
+class JmxTimer(t: org.apache.samza.metrics.Timer, on: ObjectName) extends JmxTimerMBean {
+  def getAverageTime() = t.getSnapshot().getAverage()
+  def objectName = on
+}
+
 class JmxReporterFactory extends MetricsReporterFactory with Logging {
   def getMetricsReporter(name: String, containerName: String, config: Config) = {
     info("Creating JMX reporter with  name %s." format name)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
index 9a56754..319c74d 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
@@ -25,6 +25,7 @@ import scala.collection.JavaConversions._
 import grizzled.slf4j.Logging
 import org.apache.samza.metrics.Counter
 import org.apache.samza.metrics.Gauge
+import org.apache.samza.metrics.Timer
 import org.apache.samza.metrics.MetricsReporter
 import org.apache.samza.metrics.MetricsVisitor
 import org.apache.samza.metrics.ReadableMetricsRegistry
@@ -122,6 +123,7 @@ class MetricsSnapshotReporter(
             metric.visit(new MetricsVisitor {
               def counter(counter: Counter) = groupMsg.put(name, counter.getCount: java.lang.Long)
               def gauge[T](gauge: Gauge[T]) = groupMsg.put(name, gauge.getValue.asInstanceOf[Object])
+              def timer(timer: Timer) = groupMsg.put(name, timer.getSnapshot().getAverage(): java.lang.Double)
             })
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e603a279/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
index d10dc38..27fbe2d 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
@@ -54,6 +54,9 @@ class ApplicationMasterRestServlet(config: Config, state: SamzaAppMasterState, r
 
             def gauge[T](gauge: Gauge[T]) =
               groupMap.put(gauge.getName, gauge.getValue.asInstanceOf[java.lang.Object])
+
+            def timer(timer: Timer) =
+              groupMap.put(timer.getName, timer.getSnapshot().getAverage: java.lang.Double)
           })
       }