You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2012/11/07 19:13:12 UTC

svn commit: r1406744 [1/2] - in /giraph/trunk: ./ giraph/src/main/java/org/apache/giraph/ giraph/src/main/java/org/apache/giraph/comm/netty/ giraph/src/main/java/org/apache/giraph/comm/netty/handler/ giraph/src/main/java/org/apache/giraph/graph/ giraph...

Author: apresta
Date: Wed Nov  7 18:13:10 2012
New Revision: 1406744

URL: http://svn.apache.org/viewvc?rev=1406744&view=rev
Log:
GIRAPH-407: Metrics Update (nitay via apresta)

Added:
    giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ResetSuperstepMetricsObserver.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ValueGauge.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Times.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestInfo.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallable.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FakeTime.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/MemoryUtils.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/SystemTime.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/TestPredicateLock.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/BspUtilsTest.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Nov  7 18:13:10 2012
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-407: Metrics Update (nitay via apresta)
+
   GIRAPH-404: More SendMessageCache improvements (majakabiljo)
 
   GIRAPH-412: Checkstyle error from Giraph-403 (majakabiljo) 

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java Wed Nov  7 18:13:10 2012
@@ -28,7 +28,6 @@ import org.apache.giraph.graph.VertexOut
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.WorkerContext;
 import org.apache.giraph.graph.partition.GraphPartitionerFactory;
-import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -131,6 +130,15 @@ public class GiraphConfiguration extends
   /** Default poll msecs (30 seconds) */
   public static final int POLL_MSECS_DEFAULT = 30 * 1000;
 
+  /** Enable the Metrics system **/
+  public static final String METRICS_ENABLE = "giraph.metrics.enable";
+
+  /** Whether to dump all metrics when the job finishes */
+  public static final String METRICS_DUMP_AT_END = "giraph.metrics.dump.at.end";
+
+  /** Whether to print superstep metrics */
+  public static final String METRICS_SUPERSTEP_PRINT = "giraph.metrics.print";
+
   /**
    *  ZooKeeper comma-separated list (if not set,
    *  will start up ZooKeeper locally)
@@ -796,7 +804,16 @@ public class GiraphConfiguration extends
    * @return true if we should dump metrics, false otherwise.
    */
   public boolean dumpMetricsAtEnd() {
-    return getBoolean(GiraphMetrics.DUMP_AT_END, false);
+    return getBoolean(METRICS_DUMP_AT_END, false);
+  }
+
+  /**
+   * Should we print superstep metrics at end of superstep.
+   *
+   * @return true if we should print metrics, false otherwise.
+   */
+  public boolean printSuperstepMetrics() {
+    return getBoolean(METRICS_SUPERSTEP_PRINT, false);
   }
 
   /**

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java Wed Nov  7 18:13:10 2012
@@ -18,28 +18,34 @@
 
 package org.apache.giraph.comm.netty;
 
-import java.text.DecimalFormat;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.Meter;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.MetricGroup;
-import org.apache.log4j.Logger;
+import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 import org.apache.giraph.utils.SystemTime;
 import org.apache.giraph.utils.Time;
+import org.apache.log4j.Logger;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.ChannelEvent;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelHandler;
 
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.NoOpHistogram;
+import com.yammer.metrics.core.NoOpMeter;
+
+import java.text.DecimalFormat;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * Keep track of the bytes sent/received and provide some metrics when
  * desired as part of the Netty Channel stack.
  */
-public class ByteCounter extends SimpleChannelHandler {
+public class ByteCounter extends SimpleChannelHandler implements
+    ResetSuperstepMetricsObserver {
   /** Megabyte in bytes */
   public static final double MEGABYTE = 1024f * 1024f;
   /** Helper to format the doubles */
@@ -64,25 +70,30 @@ public class ByteCounter extends SimpleC
   private final AtomicLong metricsWindowLastUpdatedMsecs = new AtomicLong();
 
   // Metrics
-  /** meter of requests sent */
-  private final Meter sentRequestsMeter;
+  /** Meter of requests sent */
+  private Meter sentRequestsMeter = NoOpMeter.INSTANCE;
   /** Histogram of bytes sent */
-  private final Histogram sentBytesHist;
+  private Histogram sentBytesHist = NoOpHistogram.INSTANCE;
   /** Meter of requests received */
-  private final Meter receivedRequestsMeter;
+  private Meter receivedRequestsMeter = NoOpMeter.INSTANCE;
   /** Histogram of bytes received */
-  private final Histogram receivedBytesHist;
+  private Histogram receivedBytesHist = NoOpHistogram.INSTANCE;
 
   /** Constructor */
   public ByteCounter() {
     // Initialize Metrics
-    sentRequestsMeter = GiraphMetrics.getMeter(MetricGroup.NETWORK,
-        "sent-requests", "requests",  TimeUnit.SECONDS);
-    sentBytesHist = GiraphMetrics.getHistogram(MetricGroup.NETWORK,
+    GiraphMetrics.getInstance().addSuperstepResetObserver(this);
+  }
+
+  @Override
+  public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
+    sentRequestsMeter = superstepMetrics.getMeter(MetricGroup.NETWORK,
+        "sent-requests", "requests", TimeUnit.SECONDS);
+    sentBytesHist = superstepMetrics.getHistogram(MetricGroup.NETWORK,
         "sent-bytes", false);
-    receivedRequestsMeter = GiraphMetrics.getMeter(MetricGroup.NETWORK,
+    receivedRequestsMeter = superstepMetrics.getMeter(MetricGroup.NETWORK,
         "received-requests", "request", TimeUnit.SECONDS);
-    receivedBytesHist = GiraphMetrics.getHistogram(MetricGroup.NETWORK,
+    receivedBytesHist = superstepMetrics.getHistogram(MetricGroup.NETWORK,
         "received-bytes", false);
   }
 

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java Wed Nov  7 18:13:10 2012
@@ -42,6 +42,7 @@ import org.apache.giraph.graph.partition
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.MetricGroup;
+import org.apache.giraph.metrics.ValueGauge;
 import org.apache.giraph.utils.PairList;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -49,7 +50,6 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Maps;
-import com.yammer.metrics.core.Histogram;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -92,9 +92,10 @@ public class NettyWorkerClientRequestPro
   private final CentralizedServiceWorker<I, V, E, M> serviceWorker;
   /** Server data from the server (used for local requests) */
   private final ServerData<I, V, E, M> serverData;
-  // Metrics
-  /** histogram of messages sent in a superstep */
-  private final Histogram msgsSentInSuperstepHist;
+
+  // Per-Superstep Metrics
+  /** messages sent in a superstep */
+  private final ValueGauge<Long> msgsSentInSuperstep;
 
   /**
    * Constructor.
@@ -123,9 +124,11 @@ public class NettyWorkerClientRequestPro
     this.serviceWorker = serviceWorker;
     this.serverData = serviceWorker.getServerData();
 
-    // Initialize Metrics
-    msgsSentInSuperstepHist = GiraphMetrics.getHistogram(
-        MetricGroup.NETWORK, "superstep-msgs-sent");
+    // Per-Superstep Metrics.
+    // Since this object is not long lived we just initialize the metrics here.
+    GiraphMetrics gmr = GiraphMetrics.getInstance();
+    msgsSentInSuperstep = new ValueGauge<Long>(gmr.perSuperstep(),
+        MetricGroup.NETWORK, "msgs-sent");
   }
 
   @Override
@@ -352,7 +355,7 @@ public class NettyWorkerClientRequestPro
 
   @Override
   public long resetMessageCount() {
-    msgsSentInSuperstepHist.update(totalMsgsSentInSuperstep);
+    msgsSentInSuperstep.set(totalMsgsSentInSuperstep);
     long messagesSentInSuperstep = totalMsgsSentInSuperstep;
     totalMsgsSentInSuperstep = 0;
     return messagesSentInSuperstep;

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java Wed Nov  7 18:13:10 2012
@@ -23,8 +23,9 @@ import org.apache.giraph.comm.netty.Byte
 import org.apache.giraph.comm.requests.RequestType;
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.utils.ReflectionUtils;
-
 import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
 import org.apache.log4j.Logger;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBufferInputStream;
@@ -39,6 +40,8 @@ public class RequestDecoder extends OneT
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(RequestDecoder.class);
+  /** Time class to use */
+  private static final Time TIME = SystemTime.getInstance();
   /** Configuration */
   private final ImmutableClassesGiraphConfiguration conf;
   /** Byte counter to output */
@@ -73,7 +76,7 @@ public class RequestDecoder extends OneT
     }
 
     if (LOG.isDebugEnabled()) {
-      startDecodingNanoseconds = SystemTime.getInstance().getNanoseconds();
+      startDecodingNanoseconds = TIME.getNanoseconds();
     }
 
     // Decode the request
@@ -92,8 +95,7 @@ public class RequestDecoder extends OneT
           ", requestId " + writableRequest.getRequestId() +
           ", " +  writableRequest.getType() + ", with size " +
           buffer.array().length + " took " +
-          SystemTime.getInstance().getNanosecondsSince(
-              startDecodingNanoseconds) + " ns");
+          Times.getNanosSince(TIME, startDecodingNanoseconds) + " ns");
     }
 
     return writableRequest;

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java Wed Nov  7 18:13:10 2012
@@ -20,7 +20,8 @@ package org.apache.giraph.comm.netty.han
 
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.utils.SystemTime;
-
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
 import org.apache.log4j.Logger;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBufferOutputStream;
@@ -33,9 +34,10 @@ import org.jboss.netty.handler.codec.one
  * Requests have a request type and an encoded request.
  */
 public class RequestEncoder extends OneToOneEncoder {
+  /** Time class to use */
+  private static final Time TIME = SystemTime.getInstance();
   /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(RequestEncoder.class);
+  private static final Logger LOG = Logger.getLogger(RequestEncoder.class);
   /** Holds the place of the message length until known */
   private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
   /** Buffer starting size */
@@ -62,7 +64,7 @@ public class RequestEncoder extends OneT
 
     // Encode the request
     if (LOG.isDebugEnabled()) {
-      startEncodingNanoseconds = SystemTime.getInstance().getNanoseconds();
+      startEncodingNanoseconds = TIME.getNanoseconds();
     }
     WritableRequest writableRequest = (WritableRequest) msg;
     ChannelBufferOutputStream outputStream =
@@ -83,8 +85,7 @@ public class RequestEncoder extends OneT
           "requestId " + writableRequest.getRequestId() +
           ", size = " + encodedBuffer.writerIndex() + ", " +
           writableRequest.getType() + " took " +
-          SystemTime.getInstance().getNanosecondsSince(
-              startEncodingNanoseconds) + " ns");
+          Times.getNanosSince(TIME, startEncodingNanoseconds) + " ns");
     }
     return encodedBuffer;
   }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestInfo.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestInfo.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestInfo.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestInfo.java Wed Nov  7 18:13:10 2012
@@ -18,17 +18,20 @@
 
 package org.apache.giraph.comm.netty.handler;
 
-import java.net.InetSocketAddress;
-import java.util.Date;
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.utils.SystemTime;
 import org.apache.giraph.utils.Time;
 import org.jboss.netty.channel.ChannelFuture;
 
+import java.net.InetSocketAddress;
+import java.util.Date;
+
 /**
  * Help track requests throughout the system
  */
 public class RequestInfo {
+  /** Time class to use */
+  private static final Time TIME = SystemTime.getInstance();
   /** Destination of the request */
   private final InetSocketAddress destinationAddress;
   /** When the request was started */
@@ -48,7 +51,7 @@ public class RequestInfo {
                      WritableRequest request) {
     this.destinationAddress = destinationAddress;
     this.request = request;
-    this.startedNanos = SystemTime.getInstance().getNanoseconds();
+    this.startedNanos = TIME.getNanoseconds();
   }
 
   public InetSocketAddress getDestinationAddress() {
@@ -70,7 +73,7 @@ public class RequestInfo {
    * @return Nanoseconds since the request was started
    */
   public long getElapsedNanos() {
-    return SystemTime.getInstance().getNanoseconds() - startedNanos;
+    return TIME.getNanoseconds() - startedNanos;
   }
 
   /**

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java Wed Nov  7 18:13:10 2012
@@ -21,6 +21,8 @@ package org.apache.giraph.comm.netty.han
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -40,6 +42,8 @@ public abstract class RequestServerHandl
     SimpleChannelUpstreamHandler {
   /** Number of bytes in the encoded response */
   public static final int RESPONSE_BYTES = 13;
+  /** Time class to use */
+  private static Time TIME = SystemTime.getInstance();
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(RequestServerHandler.class);
@@ -95,7 +99,7 @@ public abstract class RequestServerHandl
         writableRequest.getClientId(),
         writableRequest.getRequestId())) {
       if (LOG.isDebugEnabled()) {
-        startProcessingNanoseconds = SystemTime.getInstance().getNanoseconds();
+        startProcessingNanoseconds = TIME.getNanoseconds();
       }
       processRequest((R) writableRequest);
       if (LOG.isDebugEnabled()) {
@@ -103,8 +107,7 @@ public abstract class RequestServerHandl
             writableRequest.getClientId() + ", " +
             "requestId " + writableRequest.getRequestId() +
             ", " +  writableRequest.getType() + " took " +
-            SystemTime.getInstance().getNanosecondsSince(
-                startProcessingNanoseconds) + " ns");
+            Times.getNanosSince(TIME, startProcessingNanoseconds) + " ns");
       }
       alreadyDone = 0;
     } else {

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Wed Nov  7 18:13:10 2012
@@ -30,7 +30,15 @@ import org.apache.giraph.graph.partition
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.giraph.graph.partition.PartitionStats;
 import org.apache.giraph.graph.partition.PartitionUtils;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.MetricGroup;
+import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.giraph.metrics.ValueGauge;
 import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.giraph.zk.BspEvent;
 import org.apache.giraph.zk.PredicateLock;
@@ -89,7 +97,8 @@ import java.util.concurrent.Executors;
 public class BspServiceMaster<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     extends BspService<I, V, E, M>
-    implements CentralizedServiceMaster<I, V, E, M> {
+    implements CentralizedServiceMaster<I, V, E, M>,
+    ResetSuperstepMetricsObserver {
   /** Counter group name for the Giraph statistics */
   public static final String GIRAPH_STATS_COUNTER_GROUP_NAME = "Giraph Stats";
   /** Print worker names only if there are 10 workers left */
@@ -99,6 +108,8 @@ public class BspServiceMaster<I extends 
       "giraph.inputSplitThreadCount";
   /** Default number of threads to use when writing input splits to zookeeper */
   public static final int DEFAULT_INPUT_SPLIT_THREAD_COUNT = 1;
+  /** Time instance to use for timing */
+  private static final Time TIME = SystemTime.getInstance();
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(BspServiceMaster.class);
   /** Superstep counter */
@@ -155,6 +166,10 @@ public class BspServiceMaster<I extends 
   /** Limit locality information added to each InputSplit znode */
   private final int localityLimit = 5;
 
+  // Per-Superstep Metrics
+  /** MasterCompute time in msec */
+  private ValueGauge<Long> masterComputeMs;
+
   /**
    * Constructor for setting up the master.
    *
@@ -183,7 +198,7 @@ public class BspServiceMaster<I extends 
             100.0f);
     msecsPollPeriod =
         getConfiguration().getInt(GiraphConfiguration.POLL_MSECS,
-            GiraphConfiguration.POLL_MSECS_DEFAULT);
+                                  GiraphConfiguration.POLL_MSECS_DEFAULT);
     maxPollAttempts =
         getConfiguration().getInt(GiraphConfiguration.POLL_ATTEMPTS,
             GiraphConfiguration.POLL_ATTEMPTS_DEFAULT);
@@ -192,6 +207,14 @@ public class BspServiceMaster<I extends 
         GiraphConfiguration.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
     masterGraphPartitioner =
         getGraphPartitionerFactory().createMasterGraphPartitioner();
+
+    GiraphMetrics.getInstance().addSuperstepResetObserver(this);
+  }
+
+  @Override
+  public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
+    masterComputeMs = new ValueGauge<Long>(superstepMetrics, MetricGroup.USER,
+        "master-compute-call");
   }
 
   @Override
@@ -791,7 +814,7 @@ public class BspServiceMaster<I extends 
         if (masterChildArr.get(0).equals(myBid)) {
           currentMasterTaskPartitionCounter.increment(
               getTaskPartition() -
-              currentMasterTaskPartitionCounter.getValue());
+                  currentMasterTaskPartitionCounter.getValue());
           masterCompute = getConfiguration().createMasterCompute();
           aggregatorHandler = new MasterAggregatorHandler(getConfiguration(),
               getContext());
@@ -1480,7 +1503,9 @@ public class BspServiceMaster<I extends 
             "runMasterCompute: Failed in access", e);
       }
     }
+    long masterComputeBeginMs = TIME.getMilliseconds();
     masterCompute.compute();
+    masterComputeMs.set(Times.getMsSince(TIME, masterComputeBeginMs));
   }
 
   /**

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Wed Nov  7 18:13:10 2012
@@ -39,9 +39,15 @@ import org.apache.giraph.graph.partition
 import org.apache.giraph.graph.partition.WorkerGraphPartitioner;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.MetricGroup;
+import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.giraph.metrics.ValueGauge;
 import org.apache.giraph.utils.LoggerUtils;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.giraph.zk.BspEvent;
 import org.apache.giraph.zk.PredicateLock;
@@ -64,9 +70,6 @@ import org.json.JSONObject;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.Timer;
-import com.yammer.metrics.core.TimerContext;
 import net.iharder.Base64;
 
 import java.io.ByteArrayOutputStream;
@@ -99,7 +102,12 @@ import java.util.concurrent.Future;
 public class BspServiceWorker<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     extends BspService<I, V, E, M>
-    implements CentralizedServiceWorker<I, V, E, M> {
+    implements CentralizedServiceWorker<I, V, E, M>,
+    ResetSuperstepMetricsObserver {
+  /** Name of gauge for time spent waiting on other workers */
+  public static final String GAUGE_WAITING_TIME = "waiting-ms";
+  /** Time instance to use for timing */
+  private static final Time TIME = SystemTime.getInstance();
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
   /** My process health znode */
@@ -135,9 +143,11 @@ public class BspServiceWorker<I extends 
   /** Handler for aggregators */
   private final WorkerAggregatorHandler aggregatorHandler;
 
-
-  /** timer waiting for other workers */
-  private final Timer waitTimer;
+  // Per-Superstep Metrics
+  /** msec spent in WorkerContext#postSuperstep */
+  private ValueGauge<Long> wcPostSuperstepMs;
+  /** msec spent waiting for other workers */
+  private ValueGauge<Long> waitMs;
 
   /**
    * Constructor for setting up the worker.
@@ -171,29 +181,20 @@ public class BspServiceWorker<I extends 
 
     workerInfo = new WorkerInfo(
         getHostname(), getTaskPartition(), workerServer.getPort());
-    this.workerContext =
-        getConfiguration().createWorkerContext(null);
+    this.workerContext = getConfiguration().createWorkerContext(null);
 
     aggregatorHandler =
         new WorkerAggregatorHandler(this, getConfiguration(), context);
 
-    waitTimer = GiraphMetrics.getTimer(MetricGroup.NETWORK, "waiting");
-
-    initGauges();
+    GiraphMetrics.getInstance().addSuperstepResetObserver(this);
   }
 
-  /**
-   * Initialize Metrics used by this class
-   */
-  private void initGauges() {
-    GiraphMetrics.getGauge(MetricGroup.COMPUTE, "partition-map-size",
-                           new Gauge<Integer>() {
-        @Override
-        public Integer value() {
-          return getPartitionStore().getNumPartitions();
-        }
-      }
-    );
+  @Override
+  public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
+    waitMs = new ValueGauge<Long>(superstepMetrics, MetricGroup.NETWORK,
+        GAUGE_WAITING_TIME);
+    wcPostSuperstepMs = new ValueGauge<Long>(superstepMetrics,
+        MetricGroup.USER, "worker-context-post-superstep-ms");
   }
 
   @Override
@@ -727,7 +728,9 @@ else[HADOOP_NON_SECURE]*/
 
     if (getSuperstep() != INPUT_SUPERSTEP) {
       getWorkerContext().setGraphState(graphState);
+      long postSuperstepBeginMs = TIME.getMilliseconds();
       getWorkerContext().postSuperstep();
+      wcPostSuperstepMs.set(Times.getMsSince(TIME, postSuperstepBeginMs));
       getContext().progress();
     }
 
@@ -785,7 +788,7 @@ else[HADOOP_NON_SECURE]*/
     String superstepFinishedNode =
         getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
 
-    TimerContext waitTimerContext = waitTimer.time();
+    long waitBeginMs = TIME.getMilliseconds();
     try {
       while (getZkExt().exists(superstepFinishedNode, true) == null) {
         getSuperstepFinishedEvent().waitForever();
@@ -800,7 +803,8 @@ else[HADOOP_NON_SECURE]*/
           "finishSuperstep: Failed while waiting for master to " +
               "signal completion of superstep " + getSuperstep(), e);
     }
-    waitTimerContext.stop();
+
+    waitMs.set(Times.getMsSince(TIME, waitBeginMs));
 
     GlobalStats globalStats = new GlobalStats();
     WritableUtils.readFieldsFromZnode(
@@ -814,6 +818,7 @@ else[HADOOP_NON_SECURE]*/
         getGraphMapper().getMapFunctions().toString() +
         " - Attempt=" + getApplicationAttempt() +
         ", Superstep=" + getSuperstep());
+
     return new FinishedSuperstepStats(
         globalStats.getHaltComputation(),
         globalStats.getVertexCount(),
@@ -897,7 +902,7 @@ else[HADOOP_NON_SECURE]*/
     }
 
     if (getConfiguration().dumpMetricsAtEnd()) {
-      GiraphMetrics.dumpToStdout();
+      GiraphMetrics.getInstance().dumpToStdout();
     }
 
     // Preferably would shut down the service only after

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java Wed Nov  7 18:13:10 2012
@@ -29,9 +29,10 @@ import org.apache.giraph.graph.partition
 import org.apache.giraph.graph.partition.PartitionStats;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.MetricGroup;
-import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.SystemTime;
 import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
+import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.TimedLogger;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -59,6 +60,8 @@ import java.util.concurrent.Callable;
  */
 public class ComputeCallable<I extends WritableComparable, V extends Writable,
     E extends Writable, M extends Writable> implements Callable {
+  /** Name of timer for compute call */
+  public static final String TIMER_COMPUTE_ONE = "compute-one";
   /** Class logger */
   private static final Logger LOG  = Logger.getLogger(ComputeCallable.class);
   /** Class time object */
@@ -83,9 +86,9 @@ public class ComputeCallable<I extends W
   /** Get the start time in nanos */
   private final long startNanos = TIME.getNanoseconds();
 
-  // Metrics
+  // Per-Superstep Metrics
   /** Timer for single compute() call */
-  private Timer computeOneTimer;
+  private final Timer computeOneTimer;
 
   /**
    * Constructor
@@ -111,9 +114,11 @@ public class ComputeCallable<I extends W
     // Will be replaced later in call() for locality
     this.graphState = graphState;
 
-    // Metrics
-    computeOneTimer = GiraphMetrics.getTimer(MetricGroup.COMPUTE,
-                                             "compute-one");
+    GiraphMetrics metrics = GiraphMetrics.getInstance();
+    // Normally we would use ResetSuperstepMetricsObserver but this class is
+    // not long-lived, so just instantiating in the constructor is good enough.
+    computeOneTimer = metrics.perSuperstep().getTimer(MetricGroup.COMPUTE,
+        TIMER_COMPUTE_ONE);
   }
 
   @Override
@@ -155,7 +160,7 @@ public class ComputeCallable<I extends W
     }
 
     if (LOG.isInfoEnabled()) {
-      float seconds = TIME.getNanosecondsSince(startNanos) /
+      float seconds = Times.getNanosSince(TIME, startNanos) /
           Time.NS_PER_SECOND_AS_FLOAT;
       LOG.info("call: Computation took " + seconds + " secs for "  +
           partitionStatsList.size() + " partitions on superstep " +

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallable.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallable.java Wed Nov  7 18:13:10 2012
@@ -20,6 +20,7 @@ package org.apache.giraph.graph;
 
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphMetricsRegistry;
 import org.apache.giraph.metrics.MetricGroup;
 import org.apache.giraph.utils.LoggerUtils;
 import org.apache.giraph.utils.MemoryUtils;
@@ -89,8 +90,9 @@ public class EdgeInputSplitsCallable<I e
     inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
 
     // Initialize Metrics
-    edgesLoadedCounter = GiraphMetrics.getCounter(MetricGroup.IO,
-        "edges-loaded");
+    GiraphMetricsRegistry jobMetrics = GiraphMetrics.getInstance().perJob();
+    edgesLoadedCounter = jobMetrics.getCounter(MetricGroup.IO,
+        COUNTER_EDGES_LOADED);
   }
 
   /**

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java Wed Nov  7 18:13:10 2012
@@ -26,10 +26,17 @@ import org.apache.giraph.comm.messages.M
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.giraph.graph.partition.PartitionStats;
 import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphMetricsRegistry;
 import org.apache.giraph.metrics.MetricGroup;
+import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.giraph.metrics.ValueGauge;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
 import org.apache.giraph.zk.ZooKeeperManager;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
@@ -44,8 +51,6 @@ import org.apache.log4j.PatternLayout;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.yammer.metrics.core.Timer;
-import com.yammer.metrics.core.TimerContext;
 
 import java.io.IOException;
 import java.lang.reflect.Type;
@@ -74,11 +79,24 @@ import java.util.concurrent.Future;
 @SuppressWarnings("rawtypes")
 public class GraphMapper<I extends WritableComparable, V extends Writable,
     E extends Writable, M extends Writable> extends
-    Mapper<Object, Object, Object, Object> {
+    Mapper<Object, Object, Object, Object> implements
+    ResetSuperstepMetricsObserver {
   static {
     Configuration.addDefaultResource("giraph-site.xml");
   }
 
+  /** Name of metric for superstep time in msec */
+  public static final String GAUGE_SUPERSTEP_TIME = "superstep-time-ms";
+  /** Name of metric for compute on all vertices in msec */
+  public static final String GAUGE_COMPUTE_ALL = "compute-all-ms";
+  /** Name of metric for time from begin compute to first message sent */
+  public static final String GAUGE_TIME_TO_FIRST_MSG =
+      "time-to-first-message-ms";
+  /** Name of metric for time from first message till last message flushed */
+  public static final String GAUGE_COMMUNICATION_TIME = "communication-time-ms";
+
+  /** Time instance used for timing in this class */
+  private static final Time TIME = SystemTime.getInstance();
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(GraphMapper.class);
   /** Coordination service worker */
@@ -102,23 +120,27 @@ public class GraphMapper<I extends Writa
   /** Total number of edges in the graph (at this time) */
   private long numEdges = -1;
 
-  // Metrics
-  /** Timer for how long superstep took */
-  private Timer superstepTimer;
-  /** Timer for all compute() calls in a superstep */
-  private Timer computeAllTimer;
-  /** Timer for exchanging vertexes */
-  private Timer exchangeVertexPartitionsTimer;
+  // Per-Job Metrics
+  /** Timer for WorkerContext#preApplication() */
+  private ValueGauge<Long> wcPreAppMs;
+  /** Timer in msec for WorkerContext#postApplication() */
+  private ValueGauge<Long> wcPostAppMs;
+
+  // Per-Superstep Metrics
+  /** Time in msec for how long superstep took */
+  private ValueGauge<Long> superstepMs;
+  /** Time in msec for all compute() calls in a superstep */
+  private ValueGauge<Long> computeAllMs;
+  /** Time in msec when computation started */
+  private long computeAllBeginMs;
   /** Milliseconds from starting compute to sending first message */
-  private Timer timeToFirstMessage;
-  /** Timer context used for computer msec from compute to first message */
-  private volatile TimerContext timeToFirstMessageContext;
+  private ValueGauge<Long> msecToFirstMessage;
+  /** Time in msec first message sent was at */
+  private long firstMessageTimeMs;
   /** Time from first sent message till last message flushed. */
-  private Timer communicationTimer;
-  /** Timer context for communication timer. */
-  private TimerContext communicationTimerContext;
-  /** Lock for notifySentMessages() to make it thread safe */
-  private Object notifySentMsgLock = new Object();
+  private ValueGauge<Long> communicationTimer;
+  /** Timer in msec for WorkerContext#preSuperstep() */
+  private ValueGauge<Long> wcPreSuperstepMs;
 
   /** What kinds of functions to run on this mapper */
   public enum MapFunctions {
@@ -314,11 +336,9 @@ public class GraphMapper<I extends Writa
 
     // Set up GiraphMetrics
     GiraphMetrics.init(context);
-    if (LOG.isInfoEnabled()) {
-      LOG.info("setup: Initialized metrics system");
-    }
+    GiraphMetrics.getInstance().addSuperstepResetObserver(this);
+    initJobMetrics();
     MemoryUtils.initMetrics();
-    initMetrics();
 
     // Do some initial setup (possibly starting up a Zookeeper service)
     context.setStatus("setup: Initializing Zookeeper services.");
@@ -422,19 +442,28 @@ public class GraphMapper<I extends Writa
   }
 
   /**
-   * Initialize Metrics used by this class.
+   * Initialize job-level metrics used by this class.
    */
-  private void initMetrics() {
-    superstepTimer = GiraphMetrics.getTimer(MetricGroup.COMPUTE,
-                                            "superstep-time");
-    computeAllTimer = GiraphMetrics.getTimer(MetricGroup.COMPUTE,
-                                             "compute-all");
-    exchangeVertexPartitionsTimer = GiraphMetrics.getTimer(MetricGroup.NETWORK,
-        "exchange-vertex-partitions");
-    timeToFirstMessage = GiraphMetrics.getTimer(MetricGroup.COMPUTE,
-        "time-to-first-message");
-    communicationTimer = GiraphMetrics.getTimer(MetricGroup.NETWORK,
-        "communication-time");
+  private void initJobMetrics() {
+    GiraphMetricsRegistry jobMetrics = GiraphMetrics.getInstance().perJob();
+    wcPreAppMs = new ValueGauge<Long>(jobMetrics, MetricGroup.USER,
+        "worker-context-pre-app-ms");
+    wcPostAppMs = new ValueGauge<Long>(jobMetrics, MetricGroup.USER,
+        "worker-context-post-app-ms");
+  }
+
+  @Override
+  public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
+    superstepMs = new ValueGauge<Long>(superstepMetrics, MetricGroup.COMPUTE,
+        GAUGE_SUPERSTEP_TIME);
+    computeAllMs = new ValueGauge<Long>(superstepMetrics, MetricGroup.COMPUTE,
+        GAUGE_COMPUTE_ALL);
+    msecToFirstMessage = new ValueGauge<Long>(superstepMetrics,
+        MetricGroup.NETWORK, GAUGE_TIME_TO_FIRST_MSG);
+    communicationTimer = new ValueGauge<Long>(superstepMetrics,
+        MetricGroup.NETWORK, GAUGE_COMMUNICATION_TIME);
+    wcPreSuperstepMs = new ValueGauge<Long>(superstepMetrics,
+        MetricGroup.USER, "worker-context-pre-superstep-ms");
   }
 
   /**
@@ -442,14 +471,13 @@ public class GraphMapper<I extends Writa
    */
   public void notifySentMessages() {
     // We are tracking the time between when the compute started and the first
-    // message get sent. We use null to flag that we have already recorded it.
-    TimerContext tmp = timeToFirstMessageContext;
-    if (tmp != null) {
-      synchronized (notifySentMsgLock) {
-        if (timeToFirstMessageContext != null) {
-          timeToFirstMessageContext.stop();
-          timeToFirstMessageContext = null;
-          communicationTimerContext = communicationTimer.time();
+    // message get sent. We use zero to flag that we have already recorded it.
+    long tmp = firstMessageTimeMs;
+    if (tmp == 0) {
+      synchronized (msecToFirstMessage) {
+        if (firstMessageTimeMs == 0) {
+          firstMessageTimeMs = TIME.getMilliseconds();
+          msecToFirstMessage.set(firstMessageTimeMs - computeAllBeginMs);
         }
       }
     }
@@ -460,9 +488,10 @@ public class GraphMapper<I extends Writa
    * and are done waiting for all messages to send.
    */
   public void notifyFinishedCommunication() {
-    if (communicationTimerContext != null) {
-      communicationTimerContext.stop();
-      communicationTimerContext = null;
+    if (firstMessageTimeMs != 0) {
+      long commTimeMs = Times.getMsSince(TIME, firstMessageTimeMs);
+      communicationTimer.set(commTimeMs);
+      firstMessageTimeMs = 0;
     }
   }
 
@@ -479,6 +508,9 @@ public class GraphMapper<I extends Writa
       return;
     }
 
+    GiraphMetrics.getInstance().
+        resetSuperstepMetrics(BspService.INPUT_SUPERSTEP);
+
     if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) ||
         (mapFunctions == MapFunctions.MASTER_ONLY)) {
       if (LOG.isInfoEnabled()) {
@@ -507,6 +539,8 @@ public class GraphMapper<I extends Writa
     serviceWorker.getWorkerContext().setGraphState(
         new GraphState<I, V, E, M>(serviceWorker.getSuperstep(),
             numVertices, numEdges, context, this, null, aggregatorUsage));
+
+    long preAppBeginMs = TIME.getMilliseconds();
     try {
       serviceWorker.getWorkerContext().preApplication();
     } catch (InstantiationException e) {
@@ -518,6 +552,7 @@ public class GraphMapper<I extends Writa
       throw new RuntimeException(
           "map: preApplication failed in access", e);
     }
+    wcPreAppMs.set(Times.getMsSince(TIME, preAppBeginMs));
     context.progress();
 
     List<PartitionStats> partitionStatsList =
@@ -527,7 +562,9 @@ public class GraphMapper<I extends Writa
     FinishedSuperstepStats finishedSuperstepStats = null;
     do {
       final long superstep = serviceWorker.getSuperstep();
-      TimerContext superstepTimerContext = superstepTimer.time();
+      GiraphMetrics.getInstance().resetSuperstepMetrics(superstep);
+
+      long superstepBeginMs = TIME.getMilliseconds();
 
       GraphState<I, V, E, M> graphState =
           new GraphState<I, V, E, M>(superstep, numVertices, numEdges,
@@ -541,9 +578,7 @@ public class GraphMapper<I extends Writa
       }
       context.progress();
 
-      TimerContext exchangeTimerContext = exchangeVertexPartitionsTimer.time();
       serviceWorker.exchangeVertexPartitions(masterAssignedPartitionOwners);
-      exchangeTimerContext.stop();
 
       context.progress();
 
@@ -566,7 +601,9 @@ public class GraphMapper<I extends Writa
       serviceWorker.prepareSuperstep();
 
       serviceWorker.getWorkerContext().setGraphState(graphState);
+      long preSuperstepBeginMs = TIME.getMilliseconds();
       serviceWorker.getWorkerContext().preSuperstep();
+      wcPreSuperstepMs.set(Times.getMsSince(TIME, preSuperstepBeginMs));
       context.progress();
 
       MessageStoreByPartition<I, M> messageStore =
@@ -590,8 +627,7 @@ public class GraphMapper<I extends Writa
           computePartitionIdQueue.add(partitionId);
         }
 
-        TimerContext computeAllTimerContext = computeAllTimer.time();
-        timeToFirstMessageContext = timeToFirstMessage.time();
+        computeAllBeginMs = TIME.getMilliseconds();
 
         ExecutorService partitionExecutor =
             Executors.newFixedThreadPool(numThreads,
@@ -617,14 +653,19 @@ public class GraphMapper<I extends Writa
         }
         partitionExecutor.shutdown();
 
-        computeAllTimerContext.stop();
+        computeAllMs.set(Times.getMsSince(TIME, computeAllBeginMs));
       }
 
       finishedSuperstepStats =
           serviceWorker.finishSuperstep(graphState, partitionStatsList);
       numVertices = finishedSuperstepStats.getVertexCount();
       numEdges = finishedSuperstepStats.getEdgeCount();
-      superstepTimerContext.stop();
+
+      superstepMs.set(Times.getMsSince(TIME, superstepBeginMs));
+      if (conf.printSuperstepMetrics()) {
+        GiraphMetrics.getInstance().perSuperstep().printSummary();
+      }
+
     } while (!finishedSuperstepStats.getAllVerticesHalted());
     if (LOG.isInfoEnabled()) {
       LOG.info("map: BSP application done (global vertices marked done)");
@@ -633,7 +674,9 @@ public class GraphMapper<I extends Writa
     serviceWorker.getWorkerContext().setGraphState(
         new GraphState<I, V, E, M>(serviceWorker.getSuperstep(),
             numVertices, numEdges, context, this, null, aggregatorUsage));
+    long postAppBeginMs = TIME.getMilliseconds();
     serviceWorker.getWorkerContext().postApplication();
+    wcPostAppMs.set(Times.getMsSince(TIME, postAppBeginMs));
     context.progress();
   }
 

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java Wed Nov  7 18:13:10 2012
@@ -23,6 +23,7 @@ import org.apache.giraph.comm.WorkerClie
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 import org.apache.giraph.utils.SystemTime;
 import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -55,9 +56,12 @@ import java.util.concurrent.Callable;
 public abstract class InputSplitsCallable<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     implements Callable<VertexEdgeCount> {
+  /** Name of counter for vertices loaded */
+  public static final String COUNTER_VERTICES_LOADED = "vertices-loaded";
+  /** Name of counter for edges loaded */
+  public static final String COUNTER_EDGES_LOADED = "edges-loaded";
   /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(InputSplitsCallable.class);
+  private static final Logger LOG = Logger.getLogger(InputSplitsCallable.class);
   /** Class time object */
   private static final Time TIME = SystemTime.getInstance();
   /** Configuration */
@@ -180,7 +184,7 @@ public abstract class InputSplitsCallabl
     }
 
     if (LOG.isInfoEnabled()) {
-      float seconds = TIME.getNanosecondsSince(startNanos) /
+      float seconds = Times.getNanosSince(TIME, startNanos) /
           Time.NS_PER_SECOND_AS_FLOAT;
       float verticesPerSecond = vertexEdgeCount.getVertexCount() / seconds;
       float edgesPerSecond = vertexEdgeCount.getEdgeCount() / seconds;

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java Wed Nov  7 18:13:10 2012
@@ -22,6 +22,7 @@ import org.apache.giraph.GiraphConfigura
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.SuperstepState;
+import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper.Context;
@@ -110,6 +111,7 @@ public class MasterThread<I extends Writ
           while (superstepState != SuperstepState.ALL_SUPERSTEPS_DONE) {
             long startSuperstepMillis = System.currentTimeMillis();
             cachedSuperstep = bspServiceMaster.getSuperstep();
+            GiraphMetrics.getInstance().resetSuperstepMetrics(cachedSuperstep);
             superstepState = bspServiceMaster.coordinateSuperstep();
             long superstepMillis = System.currentTimeMillis() -
                 startSuperstepMillis;

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java Wed Nov  7 18:13:10 2012
@@ -21,6 +21,7 @@ package org.apache.giraph.graph;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphMetricsRegistry;
 import org.apache.giraph.metrics.MetricGroup;
 import org.apache.giraph.utils.LoggerUtils;
 import org.apache.giraph.utils.MemoryUtils;
@@ -97,10 +98,11 @@ public class VertexInputSplitsCallable<I
     this.bspServiceWorker = bspServiceWorker;
 
     // Initialize Metrics
-    verticesLoadedCounter = GiraphMetrics.getCounter(MetricGroup.IO,
-        "vertices-loaded");
-    edgesLoadedCounter = GiraphMetrics.getCounter(MetricGroup.IO,
-        "edges-loaded");
+    GiraphMetricsRegistry jobMetrics = GiraphMetrics.getInstance().perJob();
+    verticesLoadedCounter = jobMetrics.getCounter(MetricGroup.IO,
+        COUNTER_VERTICES_LOADED);
+    edgesLoadedCounter = jobMetrics.getCounter(MetricGroup.IO,
+        COUNTER_EDGES_LOADED);
   }
 
   /**

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java Wed Nov  7 18:13:10 2012
@@ -17,224 +17,127 @@
  */
 package org.apache.giraph.metrics;
 
+import org.apache.giraph.graph.BspService;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Mapper;
 
-import com.google.common.base.Joiner;
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.Meter;
-import com.yammer.metrics.core.MetricName;
-import com.yammer.metrics.core.MetricPredicate;
-import com.yammer.metrics.core.MetricsRegistry;
-import com.yammer.metrics.core.Timer;
-import com.yammer.metrics.reporting.ConsoleReporter;
-import com.yammer.metrics.reporting.JmxReporter;
+import com.google.common.collect.Lists;
 
 import java.io.PrintStream;
-import java.util.concurrent.TimeUnit;
+import java.util.List;
 
 /**
- * Wrapper around {@link MetricsRegistry} to register metrics within a Giraph
- * job. After initializing, users can add {@link com.yammer.metrics.core.Metric}
- * and have it automatically piped into the configured outputs.
+ * Top level metrics class for using Yammer's metrics in Giraph.
  */
 public class GiraphMetrics {
-  /** Enable the Metrics system **/
-  public static final String ENABLE = "giraph.metrics.enable";
+  /** Singleton instance for everyone to use */
+  private static GiraphMetrics INSTANCE = new GiraphMetrics();
 
-  /** Enable the metrics on the console **/
-  public static final String CONSOLE_ENABLE = "giraph.metrics.console.enable";
+  /** registry for per-superstep metrics */
+  private final SuperstepMetricsRegistry perSuperstep;
 
-  /** Time period for metrics **/
-  public static final String CONSOLE_PERIOD = "giraph.metrics.console.period";
+  /** registry for per-job metrics */
+  private final GiraphMetricsRegistry perJob;
 
-  /** @{link TimeUnit} for metrics time period **/
-  public static final String CONSOLE_TIME_UNIT =
-    "giraph.metrics.console.time.unit";
-
-  /** Whether to dump all metrics when the job finishes */
-  public static final String DUMP_AT_END = "giraph.metrics.dump.at.end";
-
-  /** Use the Job ID as the group of the metrics */
-  private static String JOB_ID = "";
-
-  /** Has the metrics system be initialized? **/
-  private static boolean INITED = false;
-
-  /** The registry of metrics **/
-  private static MetricsRegistry REGISTRY = new EmptyMetricsRegistry();
-  /** The reporter for JMX **/
-  private static JmxReporter REPORTER;
-
-  /** Well, this is a private constructor... **/
-  private GiraphMetrics() { }
+  /** observer for per-superstep metrics re-initialization */
+  private final List<ResetSuperstepMetricsObserver> observers =
+      Lists.newArrayList();
 
   /**
-   * Initialize the GiraphMetrics
-   *
-   * @param context Mapper's context
+   * Initialize no-op registry that creates no-op metrics.
    */
-  public static synchronized void init(Mapper.Context context) {
-    if (INITED) {
-      return;
-    }
-
-    Configuration conf = context.getConfiguration();
-    if (conf.getBoolean(ENABLE, false)) {
-      REGISTRY = new MetricsRegistry();
-      REPORTER = new JmxReporter(REGISTRY);
-      REPORTER.start();
-      JOB_ID = context.getJobID().toString();
-      initConsole(conf);
-    }
-
-    INITED = true;
+  private GiraphMetrics() {
+    perJob = new GiraphMetricsRegistry();
+    perSuperstep = new SuperstepMetricsRegistry();
   }
 
   /**
-   * Initialize console printing
+   * Initialize GiraphMetrics with Hadoop Context
    *
-   * @param conf Configuration object used by this job
+   * @param context Hadoop Context to use.
    */
-  private static void initConsole(Configuration conf) {
-    if (conf.getBoolean(CONSOLE_ENABLE, false)) {
-      String timeUnitString = conf.get(CONSOLE_TIME_UNIT, "SECONDS");
-      TimeUnit timeUnit;
-      try {
-        timeUnit = TimeUnit.valueOf(timeUnitString.toUpperCase());
-      } catch (IllegalArgumentException iae) {
-        String values = Joiner.on(",").join(TimeUnit.values());
-        throw new IllegalArgumentException("Unable to parse " + timeUnitString +
-            " as value for " + CONSOLE_TIME_UNIT + ". Must be " +
-            "one of: " + values);
-      }
-
-      int period = conf.getInt(CONSOLE_PERIOD, 90);
-      ConsoleReporter.enable(REGISTRY, period, timeUnit);
-    }
+  private GiraphMetrics(Mapper.Context context) {
+    Configuration conf = context.getConfiguration();
+    perJob = new GiraphMetricsRegistry(conf, "giraph.job");
+    perSuperstep = new SuperstepMetricsRegistry(conf,
+        BspService.INPUT_SUPERSTEP);
   }
 
   /**
-   * Dump all metrics to output stream provided.
+   * Get singleton instance of GiraphMetrics.
    *
-   * @param out PrintStream to dump to.
+   * @return GiraphMetrics singleton instance
    */
-  public static void dumpToStream(PrintStream out) {
-    new ConsoleReporter(REGISTRY, out, MetricPredicate.ALL).run();
+  public static GiraphMetrics getInstance() {
+    return INSTANCE;
   }
 
   /**
-   * Dump all metrics to stdout.
-   */
-  public static void dumpToStdout() {
-    dumpToStream(System.out);
-  }
-
-  /**
-   * Create a MetricName using the job ID, group, and name.
-   * @param group what type of metric this is
-   * @param name String name given to metric
-   * @return MetricName for use with MetricsRegistry
-   */
-  private static MetricName makeMetricName(MetricGroup group, String name) {
-    return new MetricName(JOB_ID, group.toString().toLowerCase(), name);
-  }
-
-  /**
-   * Creates a new {@link Counter} and registers it under the given group
-   * and name.
+   * Initialize singleton instance of GiraphMetrics.
    *
-   * @param group what type of metric this is
-   * @param name the name of the metric
-   * @return a new {@link Counter}
+   * @param context Hadoop Context to use.
    */
-  public static Counter getCounter(MetricGroup group, String name) {
-    return REGISTRY.newCounter(makeMetricName(group, name));
+  public static void init(Mapper.Context context) {
+    INSTANCE = new GiraphMetrics(context);
   }
 
   /**
-   * Given a new {@link Gauge}, registers it under the given group and name.
+   * Get per-job metrics.
    *
-   * @param group  what type of metric this is
-   * @param name   the name of the metric
-   * @param metric the metric
-   * @param <T>    the type of the value returned by the metric
-   * @return {@code metric}
+   * @return per-job GiraphMetricsRegistry
    */
-  public static <T> Gauge<T> getGauge(MetricGroup group, String name,
-                                      Gauge<T> metric) {
-    return REGISTRY.newGauge(makeMetricName(group, name), metric);
+  public GiraphMetricsRegistry perJob() {
+    return perJob;
   }
 
   /**
-   * Creates a new non-biased {@link Histogram} and registers it under the given
-   * group and name.
+   * Get per-superstep metrics.
    *
-   * @param group what type of metric this is
-   * @param name  the name of the metric
-   * @return a new {@link Histogram}
+   * @return per-superstep GiraphMetricsRegistry
    */
-  public static Histogram getHistogram(MetricGroup group, String name) {
-    return REGISTRY.newHistogram(makeMetricName(group, name), false);
+  public SuperstepMetricsRegistry perSuperstep() {
+    return perSuperstep;
   }
 
   /**
-   * Creates a new {@link Histogram} and registers it under the given group
-   * and name.
+   * Anyone using per-superstep counters needs to re-initialize their Metrics
+   * object on each new superstep. Otherwise they will always be updating just
+   * one counter. This method allows people to easily register a callback for
+   * when they should do the re-initializing.
    *
-   * @param group what type of metric this is
-   * @param name   the name of the metric
-   * @param biased whether or not the histogram should be biased
-   * @return a new {@link Histogram}
+   * @param observer SuperstepObserver to watch
    */
-  public static Histogram getHistogram(MetricGroup group, String name,
-                                       boolean biased) {
-    return REGISTRY.newHistogram(makeMetricName(group, name), biased);
+  public void addSuperstepResetObserver(
+      ResetSuperstepMetricsObserver observer) {
+    observers.add(observer);
   }
 
   /**
-   * Creates a new {@link Meter} and registers it under the given group
-   * and name.
+   * Reset the per-superstep MetricsRegistry
    *
-   * @param group     what type of metric this is
-   * @param name      the name of the metric
-   * @param eventType the plural name of the type of events the meter is
-   *                  measuring (e.g., {@code "requests"})
-   * @param timeUnit  the rate unit of the new meter
-   * @return a new {@link Meter}
+   * @param superstep long number of superstep
    */
-  public static Meter getMeter(MetricGroup group, String name, String eventType,
-                               TimeUnit timeUnit) {
-    return REGISTRY.newMeter(makeMetricName(group, name), eventType, timeUnit);
+  public void resetSuperstepMetrics(long superstep) {
+    perSuperstep.setSuperstep(superstep);
+    for (ResetSuperstepMetricsObserver observer : observers) {
+      observer.newSuperstep(perSuperstep);
+    }
   }
 
   /**
-   * Creates a new {@link Timer} and registers it under the given group and
-   * name, measuring elapsed time in milliseconds and invocations per second.
+   * Dump all metrics to output stream provided.
    *
-   * @param group what type of metric this is
-   * @param name  the name of the metric
-   * @return a new {@link Timer}
+   * @param out PrintStream to dump to.
    */
-  public static Timer getTimer(MetricGroup group, String name) {
-    return getTimer(group, name, TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
+  public void dumpToStream(PrintStream out) {
+    perJob.printToStream(out);
+    perSuperstep.printToStream(out);
   }
 
   /**
-   * Creates a new {@link Timer} and registers it under the given
-   * group and name.
-   *
-   * @param group what type of metric this is
-   * @param name         the name of the metric
-   * @param durationUnit the duration scale unit of the new timer
-   * @param rateUnit     the rate scale unit of the new timer
-   * @return a new {@link Timer}
+   * Dump all metrics to stdout.
    */
-  public static Timer getTimer(MetricGroup group, String name,
-                               TimeUnit durationUnit, TimeUnit rateUnit) {
-    return REGISTRY.newTimer(makeMetricName(group, name),
-                             durationUnit, rateUnit);
+  public void dumpToStdout() {
+    dumpToStream(System.out);
   }
 }

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java?rev=1406744&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java Wed Nov  7 18:13:10 2012
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.metrics;
+
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.hadoop.conf.Configuration;
+
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricPredicate;
+import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.Timer;
+import com.yammer.metrics.reporting.ConsoleReporter;
+import com.yammer.metrics.reporting.JmxReporter;
+
+import java.io.PrintStream;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A holder for MetricsRegistry together with a JmxReporter.
+ */
+public class GiraphMetricsRegistry {
+  /** String name of group to use for metrics created */
+  private String groupName;
+  /** Internal Yammer registry used */
+  private final MetricsRegistry registry;
+  /** JmxReporter that send metrics to JMX */
+  private final JmxReporter jmxReporter;
+
+  /**
+   * Create no-op empty registry that makes no-op metrics.
+   */
+  public GiraphMetricsRegistry() {
+    registry = new EmptyMetricsRegistry();
+    jmxReporter = null;
+  }
+
+  /**
+   * Create registry with Hadoop Configuration and group to use for metrics.
+   *
+   * @param conf Hadoop Configuration to use.
+   * @param groupName String group to use for metrics.
+   */
+  public GiraphMetricsRegistry(Configuration conf,  String groupName) {
+    this.groupName = groupName;
+    if (conf.getBoolean(GiraphConfiguration.METRICS_ENABLE, false)) {
+      registry = new MetricsRegistry();
+      jmxReporter = new JmxReporter(registry);
+      jmxReporter.start();
+    } else {
+      registry = new EmptyMetricsRegistry();
+      jmxReporter = null;
+    }
+  }
+
+  /**
+   * Set group name used by this MetricsRegistry. Used for incrementing
+   * superstep number to create a new hierarchy of metrics per superstep.
+   *
+   * @param groupName String group name to use.
+   */
+  protected void setGroupName(String groupName) {
+    this.groupName = groupName;
+  }
+
+  /**
+   * Dump all the metrics to the PrintStream provided.
+   *
+   * @param out PrintStream to write metrics to.
+   */
+  public void printToStream(PrintStream out) {
+    out.println("");
+    new ConsoleReporter(registry, out, MetricPredicate.ALL).run();
+  }
+
+  /**
+   * Get internal MetricsRegistry used.
+   *
+   * @return MetricsRegistry being used.
+   */
+  protected MetricsRegistry getInternalRegistry() {
+    return registry;
+  }
+
+  /**
+   * Creates a new {@link com.yammer.metrics.core.Counter} and registers it
+   * under the given group and name.
+   *
+   * @param group what type of metric this is
+   * @param name the name of the metric
+   * @return a new {@link com.yammer.metrics.core.Counter}
+   */
+  public Counter getCounter(MetricGroup group, String name) {
+    return registry.newCounter(makeMetricName(group, name));
+  }
+
+  /**
+   * Given a new {@link com.yammer.metrics.core.Gauge}, registers it under the
+   * given group and name.
+   *
+   * @param group  what type of metric this is
+   * @param name   the name of the metric
+   * @param metric the metric
+   * @param <T>    the type of the value returned by the metric
+   * @return {@code metric}
+   */
+  public <T> Gauge<T> getGauge(MetricGroup group, String name,
+                               Gauge<T> metric) {
+    return registry.newGauge(makeMetricName(group, name), metric);
+  }
+
+  /**
+   * Creates a new non-biased {@link com.yammer.metrics.core.Histogram} and
+   * registers it under the given group and name.
+   *
+   * @param group what type of metric this is
+   * @param name  the name of the metric
+   * @return a new {@link com.yammer.metrics.core.Histogram}
+   */
+  public Histogram getHistogram(MetricGroup group, String name) {
+    return registry.newHistogram(makeMetricName(group, name), false);
+  }
+
+  /**
+   * Creates a new {@link Histogram} and registers it under the given group
+   * and name.
+   *
+   * @param group what type of metric this is
+   * @param name   the name of the metric
+   * @param biased whether or not the histogram should be biased
+   * @return a new {@link Histogram}
+   */
+  public Histogram getHistogram(MetricGroup group, String name,
+                                boolean biased) {
+    return registry.newHistogram(makeMetricName(group, name), biased);
+  }
+
+  /**
+   * Creates a new {@link com.yammer.metrics.core.Meter} and registers it under
+   * the given group and name.
+   *
+   * @param group     what type of metric this is
+   * @param name      the name of the metric
+   * @param eventType the plural name of the type of events the meter is
+   *                  measuring (e.g., {@code "requests"})
+   * @param timeUnit  the rate unit of the new meter
+   * @return a new {@link com.yammer.metrics.core.Meter}
+   */
+  public Meter getMeter(MetricGroup group, String name, String eventType,
+                        TimeUnit timeUnit) {
+    return registry.newMeter(makeMetricName(group, name), eventType, timeUnit);
+  }
+
+  /**
+   * Creates a new {@link com.yammer.metrics.core.Timer} and registers it under
+   * the given group and name, measuring elapsed time in milliseconds and
+   * invocations per second.
+   *
+   * @param group what type of metric this is
+   * @param name  the name of the metric
+   * @return a new {@link com.yammer.metrics.core.Timer}
+   */
+  public Timer getTimer(MetricGroup group, String name) {
+    return getTimer(group, name, TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Creates a new {@link Timer} and registers it under the given
+   * group and name.
+   *
+   * @param group what type of metric this is
+   * @param name         the name of the metric
+   * @param durationUnit the duration scale unit of the new timer
+   * @param rateUnit     the rate scale unit of the new timer
+   * @return a new {@link Timer}
+   */
+  public Timer getTimer(MetricGroup group, String name,
+                        TimeUnit durationUnit, TimeUnit rateUnit) {
+    return registry.newTimer(makeMetricName(group, name),
+                             durationUnit, rateUnit);
+  }
+
+  /**
+   * Create a MetricName using the job ID, group, and name.
+   *
+   * @param group what type of metric this is
+   * @param name String name given to metric
+   * @return MetricName for use with MetricsRegistry
+   */
+  protected MetricName makeMetricName(MetricGroup group, String name) {
+    return new MetricName(groupName, group.toString().toLowerCase(), name);
+  }
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ResetSuperstepMetricsObserver.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ResetSuperstepMetricsObserver.java?rev=1406744&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ResetSuperstepMetricsObserver.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ResetSuperstepMetricsObserver.java Wed Nov  7 18:13:10 2012
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.metrics;
+
+/**
+ * Observer for users of per-superstep counters. Triggered with calls whenever
+ * we are starting a new superstep so that user can re-initialize their metrics.
+ */
+public interface ResetSuperstepMetricsObserver {
+  /**
+   * Starting a new superstep. Re-initialize your metrics.
+   *
+   * @param superstepMetrics SuperstepMetricsRegistry being used.
+   */
+  void newSuperstep(SuperstepMetricsRegistry superstepMetrics);
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java?rev=1406744&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java Wed Nov  7 18:13:10 2012
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.metrics;
+
+import org.apache.giraph.graph.BspService;
+import org.apache.giraph.graph.BspServiceWorker;
+import org.apache.giraph.graph.ComputeCallable;
+import org.apache.giraph.graph.GraphMapper;
+import org.apache.hadoop.conf.Configuration;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Metric;
+import com.yammer.metrics.core.Timer;
+
+import java.io.PrintStream;
+
+/**
+ * Wrapper around MetricsRegistry for per-superstep metrics.
+ */
+public class SuperstepMetricsRegistry extends GiraphMetricsRegistry {
+  /** Number of superstep to use for group of metrics created */
+  private long superstep = BspService.INPUT_SUPERSTEP;
+
+  /**
+   * Create no-op registry that creates no-op metrics.
+   */
+  public SuperstepMetricsRegistry() {
+    super();
+  }
+
+  /**
+   * Create with Hadoop Configuration and superstep number.
+   *
+   * @param conf Hadoop Configuration to use.
+   * @param superstep number of superstep to use as group for metrics.
+   */
+  public SuperstepMetricsRegistry(Configuration conf, long superstep) {
+    super(conf, makeSuperstepGroupName(superstep));
+    this.superstep = superstep;
+  }
+
+  /**
+   * Set superstep number used. Internally sets the group for metrics created.
+   *
+   * @param superstep long number of superstep to use.
+   */
+  public void setSuperstep(long superstep) {
+    super.setGroupName(makeSuperstepGroupName(superstep));
+    this.superstep = superstep;
+  }
+
+  /**
+   * Create group name to use for superstep.
+   *
+   * @param superstep long value of superstep to use.
+   * @return String group for superstep to use for metrics created.
+   */
+  private static String makeSuperstepGroupName(long superstep) {
+    return "giraph.superstep." + superstep;
+  }
+
+  /**
+   * Print human readable summary of superstep metrics.
+   *
+   * @param out PrintStream to write to.
+   */
+  public void printSummary(PrintStream out) {
+    Long commTime = getGaugeValue(MetricGroup.NETWORK,
+        GraphMapper.GAUGE_COMMUNICATION_TIME);
+    Long computeAllTime = getGaugeValue(MetricGroup.COMPUTE,
+        GraphMapper.GAUGE_COMPUTE_ALL);
+    Long timeToFirstMsg = getGaugeValue(MetricGroup.NETWORK,
+        GraphMapper.GAUGE_TIME_TO_FIRST_MSG);
+    Long superstepTime = getGaugeValue(MetricGroup.COMPUTE,
+        GraphMapper.GAUGE_SUPERSTEP_TIME);
+    Long waitingMs = getGaugeValue(MetricGroup.NETWORK,
+        BspServiceWorker.GAUGE_WAITING_TIME);
+    Timer computeOne = getTimer(MetricGroup.COMPUTE,
+        ComputeCallable.TIMER_COMPUTE_ONE);
+    double userComputeTime = computeOne.mean() * computeOne.count();
+
+    out.println("");
+    out.println("Superstep " + superstep + ":");
+    out.println("  superstep time: " + superstepTime + " ms");
+    out.println("  time to first message: " + timeToFirstMsg + " ms");
+    out.println("  compute time: " + computeAllTime + " ms");
+    out.println("  user compute time: " + userComputeTime + " ms");
+    out.println("  network communication time: " + commTime + " ms");
+    out.println("  waiting time: " + waitingMs + " ms");
+  }
+
+  /**
+   * Print human readable summary of superstep metrics.
+   */
+  public void printSummary() {
+    printSummary(System.out);
+  }
+
+  /**
+   * Get a Gauge that is already present in the MetricsRegistry
+   *
+   * @param group MetricGroup Gauge belongs to
+   * @param name String name of Gauge
+   * @param <T> value type Gauge returns
+   * @return Gauge<T> from MetricsRegistry
+   */
+  private <T> Gauge<T> getExistingGauge(MetricGroup group, String name) {
+    Metric metric = getInternalRegistry().allMetrics().
+        get(makeMetricName(group, name));
+    return metric instanceof Gauge ? (Gauge<T>) metric : null;
+  }
+
+  /**
+   * Get value of Gauge that is already present in the MetricsRegistry
+   *
+   * @param group MetricGroup Gauge belongs to
+   * @param name String name of Gauge
+   * @param <T> value type Gauge returns
+   * @return T value of Gauge<T> from MetricsRegistry
+   */
+  private <T> T getGaugeValue(MetricGroup group, String name) {
+    Gauge<T> gauge = getExistingGauge(group, name);
+    return gauge == null ? null : gauge.value();
+  }
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ValueGauge.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ValueGauge.java?rev=1406744&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ValueGauge.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ValueGauge.java Wed Nov  7 18:13:10 2012
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.metrics;
+
+import com.yammer.metrics.core.Gauge;
+
+/**
+ * A Gauge that holds a value.
+ *
+ * @param <T> type of value being held.
+ */
+public class ValueGauge<T extends Number> extends Gauge<T> {
+  /** value held by this class */
+  private T value;
+
+  /**
+   * Constructor that registers Gauge in MetricsRegistry.
+   *
+   * @param registry GiraphMetricsRegistry to use.
+   * @param group MetricGroup for Gauge.
+   * @param name String name of Gauge.
+   */
+  public ValueGauge(GiraphMetricsRegistry registry, MetricGroup group,
+                    String name) {
+    registry.getGauge(group, name, this);
+  }
+
+  @Override
+  public T value() {
+    return value;
+  }
+
+  /**
+   * Get double representation of value held.
+   *
+   * @return double value
+   */
+  public double getDouble() {
+    return value.doubleValue();
+  }
+
+  /**
+   * Get long representation of value held.
+   *
+   * @return long value
+   */
+  public long getLong() {
+    return value.longValue();
+  }
+
+  /**
+   * Set value held by this object.
+   *
+   * @param value value to set.
+   */
+  public void set(T value) {
+    this.value = value;
+  }
+}

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FakeTime.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FakeTime.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FakeTime.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FakeTime.java Wed Nov  7 18:13:10 2012
@@ -35,31 +35,16 @@ public class FakeTime implements Time {
   }
 
   @Override
-  public long getMillisecondsSince(long previousMilliseconds) {
-    return getMilliseconds() - previousMilliseconds;
-  }
-
-  @Override
   public long getNanoseconds() {
     return nanosecondsSinceEpoch.get();
   }
 
   @Override
-  public long getNanosecondsSince(long previousNanoseconds) {
-    return getNanoseconds() - previousNanoseconds;
-  }
-
-  @Override
   public int getSeconds() {
     return (int) (nanosecondsSinceEpoch.get() / NS_PER_SECOND);
   }
 
   @Override
-  public int getSecondsSince(int previousSeconds) {
-    return getSeconds() - previousSeconds;
-  }
-
-  @Override
   public Date getCurrentDate() {
     return new Date(getMilliseconds());
   }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/MemoryUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/MemoryUtils.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/MemoryUtils.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/MemoryUtils.java Wed Nov  7 18:13:10 2012
@@ -20,6 +20,7 @@ package org.apache.giraph.utils;
 
 import com.yammer.metrics.util.PercentGauge;
 import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphMetricsRegistry;
 import org.apache.giraph.metrics.MetricGroup;
 
 /**
@@ -66,8 +67,9 @@ public class MemoryUtils {
    * Initialize metrics tracked by this helper.
    */
   public static void initMetrics() {
-    GiraphMetrics.getGauge(MetricGroup.SYSTEM, "memory-free-pct",
-                           new PercentGauge() {
+    GiraphMetricsRegistry metrics = GiraphMetrics.getInstance().perJob();
+    metrics.getGauge(MetricGroup.SYSTEM, "memory-free-pct",
+      new PercentGauge() {
         @Override
         protected double getNumerator() {
           return freeMemoryMB();

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/SystemTime.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/SystemTime.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/SystemTime.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/SystemTime.java Wed Nov  7 18:13:10 2012
@@ -36,32 +36,16 @@ public class SystemTime implements Time 
   }
 
   @Override
-  public long getMillisecondsSince(long previousMilliseconds) {
-    return getMilliseconds() - previousMilliseconds;
-  }
-
-
-  @Override
   public long getNanoseconds() {
     return System.nanoTime();
   }
 
   @Override
-  public long getNanosecondsSince(long previousNanoseconds) {
-    return getNanoseconds() - previousNanoseconds;
-  }
-
-  @Override
   public int getSeconds() {
     return (int) (getMilliseconds() / MS_PER_SECOND);
   }
 
   @Override
-  public int getSecondsSince(int previousSeconds) {
-    return getSeconds() - previousSeconds;
-  }
-
-  @Override
   public Date getCurrentDate() {
     return new Date();
   }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java Wed Nov  7 18:13:10 2012
@@ -62,15 +62,6 @@ public interface Time {
   long getMilliseconds();
 
   /**
-   * Convenience method to get milliseconds since a previous milliseconds
-   * point.
-   *
-   * @param previousMilliseconds Previous milliseconds
-   * @return Milliseconds elapsed since the previous milliseconds
-   */
-  long getMillisecondsSince(long previousMilliseconds);
-
-  /**
    * Get the current nanoseconds
    *
    * @return The difference, measured in nanoseconds, between
@@ -79,15 +70,6 @@ public interface Time {
   long getNanoseconds();
 
   /**
-   * Convenience method to get nanoseconds since a previous nanoseconds
-   * point.
-   *
-   * @param previousNanoseconds Previous nanoseconds
-   * @return Nanoseconds elapsed since the previous nanoseconds
-   */
-  long getNanosecondsSince(long previousNanoseconds);
-
-  /**
    * Get the current seconds
    *
    * @return The difference, measured in seconds, between
@@ -96,15 +78,6 @@ public interface Time {
   int getSeconds();
 
   /**
-   * Convenience method to get seconds since a previous seconds
-   * point.
-   *
-   * @param previousSeconds Previous seconds
-   * @return Seconds elapsed since the previous seconds
-   */
-  int getSecondsSince(int previousSeconds);
-
-  /**
    * Get the current date
    *
    * @return Current date