You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2012/11/07 19:13:12 UTC
svn commit: r1406744 [1/2] - in /giraph/trunk: ./
giraph/src/main/java/org/apache/giraph/
giraph/src/main/java/org/apache/giraph/comm/netty/
giraph/src/main/java/org/apache/giraph/comm/netty/handler/
giraph/src/main/java/org/apache/giraph/graph/ giraph...
Author: apresta
Date: Wed Nov 7 18:13:10 2012
New Revision: 1406744
URL: http://svn.apache.org/viewvc?rev=1406744&view=rev
Log:
GIRAPH-407: Metrics Update (nitay via apresta)
Added:
giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ResetSuperstepMetricsObserver.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ValueGauge.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Times.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestInfo.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallable.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FakeTime.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/MemoryUtils.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/SystemTime.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestPredicateLock.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/BspUtilsTest.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Nov 7 18:13:10 2012
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-407: Metrics Update (nitay via apresta)
+
GIRAPH-404: More SendMessageCache improvements (majakabiljo)
GIRAPH-412: Checkstyle error from Giraph-403 (majakabiljo)
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java Wed Nov 7 18:13:10 2012
@@ -28,7 +28,6 @@ import org.apache.giraph.graph.VertexOut
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.graph.WorkerContext;
import org.apache.giraph.graph.partition.GraphPartitionerFactory;
-import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.hadoop.conf.Configuration;
/**
@@ -131,6 +130,15 @@ public class GiraphConfiguration extends
/** Default poll msecs (30 seconds) */
public static final int POLL_MSECS_DEFAULT = 30 * 1000;
+ /** Enable the Metrics system **/
+ public static final String METRICS_ENABLE = "giraph.metrics.enable";
+
+ /** Whether to dump all metrics when the job finishes */
+ public static final String METRICS_DUMP_AT_END = "giraph.metrics.dump.at.end";
+
+ /** Whether to print superstep metrics */
+ public static final String METRICS_SUPERSTEP_PRINT = "giraph.metrics.print";
+
/**
* ZooKeeper comma-separated list (if not set,
* will start up ZooKeeper locally)
@@ -796,7 +804,16 @@ public class GiraphConfiguration extends
* @return true if we should dump metrics, false otherwise.
*/
public boolean dumpMetricsAtEnd() {
- return getBoolean(GiraphMetrics.DUMP_AT_END, false);
+ return getBoolean(METRICS_DUMP_AT_END, false);
+ }
+
+ /**
+ * Should we print superstep metrics at end of superstep.
+ *
+ * @return true if we should print metrics, false otherwise.
+ */
+ public boolean printSuperstepMetrics() {
+ return getBoolean(METRICS_SUPERSTEP_PRINT, false);
}
/**
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java Wed Nov 7 18:13:10 2012
@@ -18,28 +18,34 @@
package org.apache.giraph.comm.netty;
-import java.text.DecimalFormat;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.Meter;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.MetricGroup;
-import org.apache.log4j.Logger;
+import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
import org.apache.giraph.utils.SystemTime;
import org.apache.giraph.utils.Time;
+import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.NoOpHistogram;
+import com.yammer.metrics.core.NoOpMeter;
+
+import java.text.DecimalFormat;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* Keep track of the bytes sent/received and provide some metrics when
* desired as part of the Netty Channel stack.
*/
-public class ByteCounter extends SimpleChannelHandler {
+public class ByteCounter extends SimpleChannelHandler implements
+ ResetSuperstepMetricsObserver {
/** Megabyte in bytes */
public static final double MEGABYTE = 1024f * 1024f;
/** Helper to format the doubles */
@@ -64,25 +70,30 @@ public class ByteCounter extends SimpleC
private final AtomicLong metricsWindowLastUpdatedMsecs = new AtomicLong();
// Metrics
- /** meter of requests sent */
- private final Meter sentRequestsMeter;
+ /** Meter of requests sent */
+ private Meter sentRequestsMeter = NoOpMeter.INSTANCE;
/** Histogram of bytes sent */
- private final Histogram sentBytesHist;
+ private Histogram sentBytesHist = NoOpHistogram.INSTANCE;
/** Meter of requests received */
- private final Meter receivedRequestsMeter;
+ private Meter receivedRequestsMeter = NoOpMeter.INSTANCE;
/** Histogram of bytes received */
- private final Histogram receivedBytesHist;
+ private Histogram receivedBytesHist = NoOpHistogram.INSTANCE;
/** Constructor */
public ByteCounter() {
// Initialize Metrics
- sentRequestsMeter = GiraphMetrics.getMeter(MetricGroup.NETWORK,
- "sent-requests", "requests", TimeUnit.SECONDS);
- sentBytesHist = GiraphMetrics.getHistogram(MetricGroup.NETWORK,
+ GiraphMetrics.getInstance().addSuperstepResetObserver(this);
+ }
+
+ @Override
+ public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
+ sentRequestsMeter = superstepMetrics.getMeter(MetricGroup.NETWORK,
+ "sent-requests", "requests", TimeUnit.SECONDS);
+ sentBytesHist = superstepMetrics.getHistogram(MetricGroup.NETWORK,
"sent-bytes", false);
- receivedRequestsMeter = GiraphMetrics.getMeter(MetricGroup.NETWORK,
+ receivedRequestsMeter = superstepMetrics.getMeter(MetricGroup.NETWORK,
"received-requests", "request", TimeUnit.SECONDS);
- receivedBytesHist = GiraphMetrics.getHistogram(MetricGroup.NETWORK,
+ receivedBytesHist = superstepMetrics.getHistogram(MetricGroup.NETWORK,
"received-bytes", false);
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java Wed Nov 7 18:13:10 2012
@@ -42,6 +42,7 @@ import org.apache.giraph.graph.partition
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.MetricGroup;
+import org.apache.giraph.metrics.ValueGauge;
import org.apache.giraph.utils.PairList;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -49,7 +50,6 @@ import org.apache.hadoop.mapreduce.Mappe
import org.apache.log4j.Logger;
import com.google.common.collect.Maps;
-import com.yammer.metrics.core.Histogram;
import java.io.IOException;
import java.util.Collection;
@@ -92,9 +92,10 @@ public class NettyWorkerClientRequestPro
private final CentralizedServiceWorker<I, V, E, M> serviceWorker;
/** Server data from the server (used for local requests) */
private final ServerData<I, V, E, M> serverData;
- // Metrics
- /** histogram of messages sent in a superstep */
- private final Histogram msgsSentInSuperstepHist;
+
+ // Per-Superstep Metrics
+ /** messages sent in a superstep */
+ private final ValueGauge<Long> msgsSentInSuperstep;
/**
* Constructor.
@@ -123,9 +124,11 @@ public class NettyWorkerClientRequestPro
this.serviceWorker = serviceWorker;
this.serverData = serviceWorker.getServerData();
- // Initialize Metrics
- msgsSentInSuperstepHist = GiraphMetrics.getHistogram(
- MetricGroup.NETWORK, "superstep-msgs-sent");
+ // Per-Superstep Metrics.
+ // Since this object is not long lived we just initialize the metrics here.
+ GiraphMetrics gmr = GiraphMetrics.getInstance();
+ msgsSentInSuperstep = new ValueGauge<Long>(gmr.perSuperstep(),
+ MetricGroup.NETWORK, "msgs-sent");
}
@Override
@@ -352,7 +355,7 @@ public class NettyWorkerClientRequestPro
@Override
public long resetMessageCount() {
- msgsSentInSuperstepHist.update(totalMsgsSentInSuperstep);
+ msgsSentInSuperstep.set(totalMsgsSentInSuperstep);
long messagesSentInSuperstep = totalMsgsSentInSuperstep;
totalMsgsSentInSuperstep = 0;
return messagesSentInSuperstep;
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java Wed Nov 7 18:13:10 2012
@@ -23,8 +23,9 @@ import org.apache.giraph.comm.netty.Byte
import org.apache.giraph.comm.requests.RequestType;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.utils.ReflectionUtils;
-
import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
@@ -39,6 +40,8 @@ public class RequestDecoder extends OneT
/** Class logger */
private static final Logger LOG =
Logger.getLogger(RequestDecoder.class);
+ /** Time class to use */
+ private static final Time TIME = SystemTime.getInstance();
/** Configuration */
private final ImmutableClassesGiraphConfiguration conf;
/** Byte counter to output */
@@ -73,7 +76,7 @@ public class RequestDecoder extends OneT
}
if (LOG.isDebugEnabled()) {
- startDecodingNanoseconds = SystemTime.getInstance().getNanoseconds();
+ startDecodingNanoseconds = TIME.getNanoseconds();
}
// Decode the request
@@ -92,8 +95,7 @@ public class RequestDecoder extends OneT
", requestId " + writableRequest.getRequestId() +
", " + writableRequest.getType() + ", with size " +
buffer.array().length + " took " +
- SystemTime.getInstance().getNanosecondsSince(
- startDecodingNanoseconds) + " ns");
+ Times.getNanosSince(TIME, startDecodingNanoseconds) + " ns");
}
return writableRequest;
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java Wed Nov 7 18:13:10 2012
@@ -20,7 +20,8 @@ package org.apache.giraph.comm.netty.han
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.utils.SystemTime;
-
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferOutputStream;
@@ -33,9 +34,10 @@ import org.jboss.netty.handler.codec.one
* Requests have a request type and an encoded request.
*/
public class RequestEncoder extends OneToOneEncoder {
+ /** Time class to use */
+ private static final Time TIME = SystemTime.getInstance();
/** Class logger */
- private static final Logger LOG =
- Logger.getLogger(RequestEncoder.class);
+ private static final Logger LOG = Logger.getLogger(RequestEncoder.class);
/** Holds the place of the message length until known */
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
/** Buffer starting size */
@@ -62,7 +64,7 @@ public class RequestEncoder extends OneT
// Encode the request
if (LOG.isDebugEnabled()) {
- startEncodingNanoseconds = SystemTime.getInstance().getNanoseconds();
+ startEncodingNanoseconds = TIME.getNanoseconds();
}
WritableRequest writableRequest = (WritableRequest) msg;
ChannelBufferOutputStream outputStream =
@@ -83,8 +85,7 @@ public class RequestEncoder extends OneT
"requestId " + writableRequest.getRequestId() +
", size = " + encodedBuffer.writerIndex() + ", " +
writableRequest.getType() + " took " +
- SystemTime.getInstance().getNanosecondsSince(
- startEncodingNanoseconds) + " ns");
+ Times.getNanosSince(TIME, startEncodingNanoseconds) + " ns");
}
return encodedBuffer;
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestInfo.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestInfo.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestInfo.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestInfo.java Wed Nov 7 18:13:10 2012
@@ -18,17 +18,20 @@
package org.apache.giraph.comm.netty.handler;
-import java.net.InetSocketAddress;
-import java.util.Date;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.utils.SystemTime;
import org.apache.giraph.utils.Time;
import org.jboss.netty.channel.ChannelFuture;
+import java.net.InetSocketAddress;
+import java.util.Date;
+
/**
* Help track requests throughout the system
*/
public class RequestInfo {
+ /** Time class to use */
+ private static final Time TIME = SystemTime.getInstance();
/** Destination of the request */
private final InetSocketAddress destinationAddress;
/** When the request was started */
@@ -48,7 +51,7 @@ public class RequestInfo {
WritableRequest request) {
this.destinationAddress = destinationAddress;
this.request = request;
- this.startedNanos = SystemTime.getInstance().getNanoseconds();
+ this.startedNanos = TIME.getNanoseconds();
}
public InetSocketAddress getDestinationAddress() {
@@ -70,7 +73,7 @@ public class RequestInfo {
* @return Nanoseconds since the request was started
*/
public long getElapsedNanos() {
- return SystemTime.getInstance().getNanoseconds() - startedNanos;
+ return TIME.getNanoseconds() - startedNanos;
}
/**
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java Wed Nov 7 18:13:10 2012
@@ -21,6 +21,8 @@ package org.apache.giraph.comm.netty.han
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -40,6 +42,8 @@ public abstract class RequestServerHandl
SimpleChannelUpstreamHandler {
/** Number of bytes in the encoded response */
public static final int RESPONSE_BYTES = 13;
+ /** Time class to use */
+ private static Time TIME = SystemTime.getInstance();
/** Class logger */
private static final Logger LOG =
Logger.getLogger(RequestServerHandler.class);
@@ -95,7 +99,7 @@ public abstract class RequestServerHandl
writableRequest.getClientId(),
writableRequest.getRequestId())) {
if (LOG.isDebugEnabled()) {
- startProcessingNanoseconds = SystemTime.getInstance().getNanoseconds();
+ startProcessingNanoseconds = TIME.getNanoseconds();
}
processRequest((R) writableRequest);
if (LOG.isDebugEnabled()) {
@@ -103,8 +107,7 @@ public abstract class RequestServerHandl
writableRequest.getClientId() + ", " +
"requestId " + writableRequest.getRequestId() +
", " + writableRequest.getType() + " took " +
- SystemTime.getInstance().getNanosecondsSince(
- startProcessingNanoseconds) + " ns");
+ Times.getNanosSince(TIME, startProcessingNanoseconds) + " ns");
}
alreadyDone = 0;
} else {
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Wed Nov 7 18:13:10 2012
@@ -30,7 +30,15 @@ import org.apache.giraph.graph.partition
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.giraph.graph.partition.PartitionStats;
import org.apache.giraph.graph.partition.PartitionUtils;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.MetricGroup;
+import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.giraph.metrics.ValueGauge;
import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
import org.apache.giraph.utils.WritableUtils;
import org.apache.giraph.zk.BspEvent;
import org.apache.giraph.zk.PredicateLock;
@@ -89,7 +97,8 @@ import java.util.concurrent.Executors;
public class BspServiceMaster<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
extends BspService<I, V, E, M>
- implements CentralizedServiceMaster<I, V, E, M> {
+ implements CentralizedServiceMaster<I, V, E, M>,
+ ResetSuperstepMetricsObserver {
/** Counter group name for the Giraph statistics */
public static final String GIRAPH_STATS_COUNTER_GROUP_NAME = "Giraph Stats";
/** Print worker names only if there are 10 workers left */
@@ -99,6 +108,8 @@ public class BspServiceMaster<I extends
"giraph.inputSplitThreadCount";
/** Default number of threads to use when writing input splits to zookeeper */
public static final int DEFAULT_INPUT_SPLIT_THREAD_COUNT = 1;
+ /** Time instance to use for timing */
+ private static final Time TIME = SystemTime.getInstance();
/** Class logger */
private static final Logger LOG = Logger.getLogger(BspServiceMaster.class);
/** Superstep counter */
@@ -155,6 +166,10 @@ public class BspServiceMaster<I extends
/** Limit locality information added to each InputSplit znode */
private final int localityLimit = 5;
+ // Per-Superstep Metrics
+ /** MasterCompute time in msec */
+ private ValueGauge<Long> masterComputeMs;
+
/**
* Constructor for setting up the master.
*
@@ -183,7 +198,7 @@ public class BspServiceMaster<I extends
100.0f);
msecsPollPeriod =
getConfiguration().getInt(GiraphConfiguration.POLL_MSECS,
- GiraphConfiguration.POLL_MSECS_DEFAULT);
+ GiraphConfiguration.POLL_MSECS_DEFAULT);
maxPollAttempts =
getConfiguration().getInt(GiraphConfiguration.POLL_ATTEMPTS,
GiraphConfiguration.POLL_ATTEMPTS_DEFAULT);
@@ -192,6 +207,14 @@ public class BspServiceMaster<I extends
GiraphConfiguration.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
masterGraphPartitioner =
getGraphPartitionerFactory().createMasterGraphPartitioner();
+
+ GiraphMetrics.getInstance().addSuperstepResetObserver(this);
+ }
+
+ @Override
+ public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
+ masterComputeMs = new ValueGauge<Long>(superstepMetrics, MetricGroup.USER,
+ "master-compute-call");
}
@Override
@@ -791,7 +814,7 @@ public class BspServiceMaster<I extends
if (masterChildArr.get(0).equals(myBid)) {
currentMasterTaskPartitionCounter.increment(
getTaskPartition() -
- currentMasterTaskPartitionCounter.getValue());
+ currentMasterTaskPartitionCounter.getValue());
masterCompute = getConfiguration().createMasterCompute();
aggregatorHandler = new MasterAggregatorHandler(getConfiguration(),
getContext());
@@ -1480,7 +1503,9 @@ public class BspServiceMaster<I extends
"runMasterCompute: Failed in access", e);
}
}
+ long masterComputeBeginMs = TIME.getMilliseconds();
masterCompute.compute();
+ masterComputeMs.set(Times.getMsSince(TIME, masterComputeBeginMs));
}
/**
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Wed Nov 7 18:13:10 2012
@@ -39,9 +39,15 @@ import org.apache.giraph.graph.partition
import org.apache.giraph.graph.partition.WorkerGraphPartitioner;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.MetricGroup;
+import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.giraph.metrics.ValueGauge;
import org.apache.giraph.utils.LoggerUtils;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
import org.apache.giraph.utils.WritableUtils;
import org.apache.giraph.zk.BspEvent;
import org.apache.giraph.zk.PredicateLock;
@@ -64,9 +70,6 @@ import org.json.JSONObject;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.Timer;
-import com.yammer.metrics.core.TimerContext;
import net.iharder.Base64;
import java.io.ByteArrayOutputStream;
@@ -99,7 +102,12 @@ import java.util.concurrent.Future;
public class BspServiceWorker<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
extends BspService<I, V, E, M>
- implements CentralizedServiceWorker<I, V, E, M> {
+ implements CentralizedServiceWorker<I, V, E, M>,
+ ResetSuperstepMetricsObserver {
+ /** Name of gauge for time spent waiting on other workers */
+ public static final String GAUGE_WAITING_TIME = "waiting-ms";
+ /** Time instance to use for timing */
+ private static final Time TIME = SystemTime.getInstance();
/** Class logger */
private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
/** My process health znode */
@@ -135,9 +143,11 @@ public class BspServiceWorker<I extends
/** Handler for aggregators */
private final WorkerAggregatorHandler aggregatorHandler;
-
- /** timer waiting for other workers */
- private final Timer waitTimer;
+ // Per-Superstep Metrics
+ /** msec spent in WorkerContext#postSuperstep */
+ private ValueGauge<Long> wcPostSuperstepMs;
+ /** msec spent waiting for other workers */
+ private ValueGauge<Long> waitMs;
/**
* Constructor for setting up the worker.
@@ -171,29 +181,20 @@ public class BspServiceWorker<I extends
workerInfo = new WorkerInfo(
getHostname(), getTaskPartition(), workerServer.getPort());
- this.workerContext =
- getConfiguration().createWorkerContext(null);
+ this.workerContext = getConfiguration().createWorkerContext(null);
aggregatorHandler =
new WorkerAggregatorHandler(this, getConfiguration(), context);
- waitTimer = GiraphMetrics.getTimer(MetricGroup.NETWORK, "waiting");
-
- initGauges();
+ GiraphMetrics.getInstance().addSuperstepResetObserver(this);
}
- /**
- * Initialize Metrics used by this class
- */
- private void initGauges() {
- GiraphMetrics.getGauge(MetricGroup.COMPUTE, "partition-map-size",
- new Gauge<Integer>() {
- @Override
- public Integer value() {
- return getPartitionStore().getNumPartitions();
- }
- }
- );
+ @Override
+ public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
+ waitMs = new ValueGauge<Long>(superstepMetrics, MetricGroup.NETWORK,
+ GAUGE_WAITING_TIME);
+ wcPostSuperstepMs = new ValueGauge<Long>(superstepMetrics,
+ MetricGroup.USER, "worker-context-post-superstep-ms");
}
@Override
@@ -727,7 +728,9 @@ else[HADOOP_NON_SECURE]*/
if (getSuperstep() != INPUT_SUPERSTEP) {
getWorkerContext().setGraphState(graphState);
+ long postSuperstepBeginMs = TIME.getMilliseconds();
getWorkerContext().postSuperstep();
+ wcPostSuperstepMs.set(Times.getMsSince(TIME, postSuperstepBeginMs));
getContext().progress();
}
@@ -785,7 +788,7 @@ else[HADOOP_NON_SECURE]*/
String superstepFinishedNode =
getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
- TimerContext waitTimerContext = waitTimer.time();
+ long waitBeginMs = TIME.getMilliseconds();
try {
while (getZkExt().exists(superstepFinishedNode, true) == null) {
getSuperstepFinishedEvent().waitForever();
@@ -800,7 +803,8 @@ else[HADOOP_NON_SECURE]*/
"finishSuperstep: Failed while waiting for master to " +
"signal completion of superstep " + getSuperstep(), e);
}
- waitTimerContext.stop();
+
+ waitMs.set(Times.getMsSince(TIME, waitBeginMs));
GlobalStats globalStats = new GlobalStats();
WritableUtils.readFieldsFromZnode(
@@ -814,6 +818,7 @@ else[HADOOP_NON_SECURE]*/
getGraphMapper().getMapFunctions().toString() +
" - Attempt=" + getApplicationAttempt() +
", Superstep=" + getSuperstep());
+
return new FinishedSuperstepStats(
globalStats.getHaltComputation(),
globalStats.getVertexCount(),
@@ -897,7 +902,7 @@ else[HADOOP_NON_SECURE]*/
}
if (getConfiguration().dumpMetricsAtEnd()) {
- GiraphMetrics.dumpToStdout();
+ GiraphMetrics.getInstance().dumpToStdout();
}
// Preferably would shut down the service only after
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java Wed Nov 7 18:13:10 2012
@@ -29,9 +29,10 @@ import org.apache.giraph.graph.partition
import org.apache.giraph.graph.partition.PartitionStats;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.MetricGroup;
-import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.SystemTime;
import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
+import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.TimedLogger;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -59,6 +60,8 @@ import java.util.concurrent.Callable;
*/
public class ComputeCallable<I extends WritableComparable, V extends Writable,
E extends Writable, M extends Writable> implements Callable {
+ /** Name of timer for compute call */
+ public static final String TIMER_COMPUTE_ONE = "compute-one";
/** Class logger */
private static final Logger LOG = Logger.getLogger(ComputeCallable.class);
/** Class time object */
@@ -83,9 +86,9 @@ public class ComputeCallable<I extends W
/** Get the start time in nanos */
private final long startNanos = TIME.getNanoseconds();
- // Metrics
+ // Per-Superstep Metrics
/** Timer for single compute() call */
- private Timer computeOneTimer;
+ private final Timer computeOneTimer;
/**
* Constructor
@@ -111,9 +114,11 @@ public class ComputeCallable<I extends W
// Will be replaced later in call() for locality
this.graphState = graphState;
- // Metrics
- computeOneTimer = GiraphMetrics.getTimer(MetricGroup.COMPUTE,
- "compute-one");
+ GiraphMetrics metrics = GiraphMetrics.getInstance();
+ // Normally we would use ResetSuperstepMetricsObserver but this class is
+ // not long-lived, so just instantiating in the constructor is good enough.
+ computeOneTimer = metrics.perSuperstep().getTimer(MetricGroup.COMPUTE,
+ TIMER_COMPUTE_ONE);
}
@Override
@@ -155,7 +160,7 @@ public class ComputeCallable<I extends W
}
if (LOG.isInfoEnabled()) {
- float seconds = TIME.getNanosecondsSince(startNanos) /
+ float seconds = Times.getNanosSince(TIME, startNanos) /
Time.NS_PER_SECOND_AS_FLOAT;
LOG.info("call: Computation took " + seconds + " secs for " +
partitionStatsList.size() + " partitions on superstep " +
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallable.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallable.java Wed Nov 7 18:13:10 2012
@@ -20,6 +20,7 @@ package org.apache.giraph.graph;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphMetricsRegistry;
import org.apache.giraph.metrics.MetricGroup;
import org.apache.giraph.utils.LoggerUtils;
import org.apache.giraph.utils.MemoryUtils;
@@ -89,8 +90,9 @@ public class EdgeInputSplitsCallable<I e
inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
// Initialize Metrics
- edgesLoadedCounter = GiraphMetrics.getCounter(MetricGroup.IO,
- "edges-loaded");
+ GiraphMetricsRegistry jobMetrics = GiraphMetrics.getInstance().perJob();
+ edgesLoadedCounter = jobMetrics.getCounter(MetricGroup.IO,
+ COUNTER_EDGES_LOADED);
}
/**
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java Wed Nov 7 18:13:10 2012
@@ -26,10 +26,17 @@ import org.apache.giraph.comm.messages.M
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.giraph.graph.partition.PartitionStats;
import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphMetricsRegistry;
import org.apache.giraph.metrics.MetricGroup;
+import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.giraph.metrics.ValueGauge;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
import org.apache.giraph.zk.ZooKeeperManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
@@ -44,8 +51,6 @@ import org.apache.log4j.PatternLayout;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.yammer.metrics.core.Timer;
-import com.yammer.metrics.core.TimerContext;
import java.io.IOException;
import java.lang.reflect.Type;
@@ -74,11 +79,24 @@ import java.util.concurrent.Future;
@SuppressWarnings("rawtypes")
public class GraphMapper<I extends WritableComparable, V extends Writable,
E extends Writable, M extends Writable> extends
- Mapper<Object, Object, Object, Object> {
+ Mapper<Object, Object, Object, Object> implements
+ ResetSuperstepMetricsObserver {
static {
Configuration.addDefaultResource("giraph-site.xml");
}
+ /** Name of metric for superstep time in msec */
+ public static final String GAUGE_SUPERSTEP_TIME = "superstep-time-ms";
+ /** Name of metric for compute on all vertices in msec */
+ public static final String GAUGE_COMPUTE_ALL = "compute-all-ms";
+ /** Name of metric for time from begin compute to first message sent */
+ public static final String GAUGE_TIME_TO_FIRST_MSG =
+ "time-to-first-message-ms";
+ /** Name of metric for time from first message till last message flushed */
+ public static final String GAUGE_COMMUNICATION_TIME = "communication-time-ms";
+
+ /** Time instance used for timing in this class */
+ private static final Time TIME = SystemTime.getInstance();
/** Class logger */
private static final Logger LOG = Logger.getLogger(GraphMapper.class);
/** Coordination service worker */
@@ -102,23 +120,27 @@ public class GraphMapper<I extends Writa
/** Total number of edges in the graph (at this time) */
private long numEdges = -1;
- // Metrics
- /** Timer for how long superstep took */
- private Timer superstepTimer;
- /** Timer for all compute() calls in a superstep */
- private Timer computeAllTimer;
- /** Timer for exchanging vertexes */
- private Timer exchangeVertexPartitionsTimer;
+ // Per-Job Metrics
+ /** Timer for WorkerContext#preApplication() */
+ private ValueGauge<Long> wcPreAppMs;
+ /** Timer in msec for WorkerContext#postApplication() */
+ private ValueGauge<Long> wcPostAppMs;
+
+ // Per-Superstep Metrics
+ /** Time in msec for how long superstep took */
+ private ValueGauge<Long> superstepMs;
+ /** Time in msec for all compute() calls in a superstep */
+ private ValueGauge<Long> computeAllMs;
+ /** Time in msec when computation started */
+ private long computeAllBeginMs;
/** Milliseconds from starting compute to sending first message */
- private Timer timeToFirstMessage;
- /** Timer context used for computer msec from compute to first message */
- private volatile TimerContext timeToFirstMessageContext;
+ private ValueGauge<Long> msecToFirstMessage;
+ /** Time in msec first message sent was at */
+ private long firstMessageTimeMs;
/** Time from first sent message till last message flushed. */
- private Timer communicationTimer;
- /** Timer context for communication timer. */
- private TimerContext communicationTimerContext;
- /** Lock for notifySentMessages() to make it thread safe */
- private Object notifySentMsgLock = new Object();
+ private ValueGauge<Long> communicationTimer;
+ /** Timer in msec for WorkerContext#preSuperstep() */
+ private ValueGauge<Long> wcPreSuperstepMs;
/** What kinds of functions to run on this mapper */
public enum MapFunctions {
@@ -314,11 +336,9 @@ public class GraphMapper<I extends Writa
// Set up GiraphMetrics
GiraphMetrics.init(context);
- if (LOG.isInfoEnabled()) {
- LOG.info("setup: Initialized metrics system");
- }
+ GiraphMetrics.getInstance().addSuperstepResetObserver(this);
+ initJobMetrics();
MemoryUtils.initMetrics();
- initMetrics();
// Do some initial setup (possibly starting up a Zookeeper service)
context.setStatus("setup: Initializing Zookeeper services.");
@@ -422,19 +442,28 @@ public class GraphMapper<I extends Writa
}
/**
- * Initialize Metrics used by this class.
+ * Initialize job-level metrics used by this class.
*/
- private void initMetrics() {
- superstepTimer = GiraphMetrics.getTimer(MetricGroup.COMPUTE,
- "superstep-time");
- computeAllTimer = GiraphMetrics.getTimer(MetricGroup.COMPUTE,
- "compute-all");
- exchangeVertexPartitionsTimer = GiraphMetrics.getTimer(MetricGroup.NETWORK,
- "exchange-vertex-partitions");
- timeToFirstMessage = GiraphMetrics.getTimer(MetricGroup.COMPUTE,
- "time-to-first-message");
- communicationTimer = GiraphMetrics.getTimer(MetricGroup.NETWORK,
- "communication-time");
+ private void initJobMetrics() {
+ GiraphMetricsRegistry jobMetrics = GiraphMetrics.getInstance().perJob();
+ wcPreAppMs = new ValueGauge<Long>(jobMetrics, MetricGroup.USER,
+ "worker-context-pre-app-ms");
+ wcPostAppMs = new ValueGauge<Long>(jobMetrics, MetricGroup.USER,
+ "worker-context-post-app-ms");
+ }
+
+ @Override
+ public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
+ superstepMs = new ValueGauge<Long>(superstepMetrics, MetricGroup.COMPUTE,
+ GAUGE_SUPERSTEP_TIME);
+ computeAllMs = new ValueGauge<Long>(superstepMetrics, MetricGroup.COMPUTE,
+ GAUGE_COMPUTE_ALL);
+ msecToFirstMessage = new ValueGauge<Long>(superstepMetrics,
+ MetricGroup.NETWORK, GAUGE_TIME_TO_FIRST_MSG);
+ communicationTimer = new ValueGauge<Long>(superstepMetrics,
+ MetricGroup.NETWORK, GAUGE_COMMUNICATION_TIME);
+ wcPreSuperstepMs = new ValueGauge<Long>(superstepMetrics,
+ MetricGroup.USER, "worker-context-pre-superstep-ms");
}
/**
@@ -442,14 +471,13 @@ public class GraphMapper<I extends Writa
*/
public void notifySentMessages() {
// We are tracking the time between when the compute started and the first
- // message get sent. We use null to flag that we have already recorded it.
- TimerContext tmp = timeToFirstMessageContext;
- if (tmp != null) {
- synchronized (notifySentMsgLock) {
- if (timeToFirstMessageContext != null) {
- timeToFirstMessageContext.stop();
- timeToFirstMessageContext = null;
- communicationTimerContext = communicationTimer.time();
+ // message get sent. We use zero to flag that we have already recorded it.
+ long tmp = firstMessageTimeMs;
+ if (tmp == 0) {
+ synchronized (msecToFirstMessage) {
+ if (firstMessageTimeMs == 0) {
+ firstMessageTimeMs = TIME.getMilliseconds();
+ msecToFirstMessage.set(firstMessageTimeMs - computeAllBeginMs);
}
}
}
@@ -460,9 +488,10 @@ public class GraphMapper<I extends Writa
* and are done waiting for all messages to send.
*/
public void notifyFinishedCommunication() {
- if (communicationTimerContext != null) {
- communicationTimerContext.stop();
- communicationTimerContext = null;
+ if (firstMessageTimeMs != 0) {
+ long commTimeMs = Times.getMsSince(TIME, firstMessageTimeMs);
+ communicationTimer.set(commTimeMs);
+ firstMessageTimeMs = 0;
}
}
@@ -479,6 +508,9 @@ public class GraphMapper<I extends Writa
return;
}
+ GiraphMetrics.getInstance().
+ resetSuperstepMetrics(BspService.INPUT_SUPERSTEP);
+
if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) ||
(mapFunctions == MapFunctions.MASTER_ONLY)) {
if (LOG.isInfoEnabled()) {
@@ -507,6 +539,8 @@ public class GraphMapper<I extends Writa
serviceWorker.getWorkerContext().setGraphState(
new GraphState<I, V, E, M>(serviceWorker.getSuperstep(),
numVertices, numEdges, context, this, null, aggregatorUsage));
+
+ long preAppBeginMs = TIME.getMilliseconds();
try {
serviceWorker.getWorkerContext().preApplication();
} catch (InstantiationException e) {
@@ -518,6 +552,7 @@ public class GraphMapper<I extends Writa
throw new RuntimeException(
"map: preApplication failed in access", e);
}
+ wcPreAppMs.set(Times.getMsSince(TIME, preAppBeginMs));
context.progress();
List<PartitionStats> partitionStatsList =
@@ -527,7 +562,9 @@ public class GraphMapper<I extends Writa
FinishedSuperstepStats finishedSuperstepStats = null;
do {
final long superstep = serviceWorker.getSuperstep();
- TimerContext superstepTimerContext = superstepTimer.time();
+ GiraphMetrics.getInstance().resetSuperstepMetrics(superstep);
+
+ long superstepBeginMs = TIME.getMilliseconds();
GraphState<I, V, E, M> graphState =
new GraphState<I, V, E, M>(superstep, numVertices, numEdges,
@@ -541,9 +578,7 @@ public class GraphMapper<I extends Writa
}
context.progress();
- TimerContext exchangeTimerContext = exchangeVertexPartitionsTimer.time();
serviceWorker.exchangeVertexPartitions(masterAssignedPartitionOwners);
- exchangeTimerContext.stop();
context.progress();
@@ -566,7 +601,9 @@ public class GraphMapper<I extends Writa
serviceWorker.prepareSuperstep();
serviceWorker.getWorkerContext().setGraphState(graphState);
+ long preSuperstepBeginMs = TIME.getMilliseconds();
serviceWorker.getWorkerContext().preSuperstep();
+ wcPreSuperstepMs.set(Times.getMsSince(TIME, preSuperstepBeginMs));
context.progress();
MessageStoreByPartition<I, M> messageStore =
@@ -590,8 +627,7 @@ public class GraphMapper<I extends Writa
computePartitionIdQueue.add(partitionId);
}
- TimerContext computeAllTimerContext = computeAllTimer.time();
- timeToFirstMessageContext = timeToFirstMessage.time();
+ computeAllBeginMs = TIME.getMilliseconds();
ExecutorService partitionExecutor =
Executors.newFixedThreadPool(numThreads,
@@ -617,14 +653,19 @@ public class GraphMapper<I extends Writa
}
partitionExecutor.shutdown();
- computeAllTimerContext.stop();
+ computeAllMs.set(Times.getMsSince(TIME, computeAllBeginMs));
}
finishedSuperstepStats =
serviceWorker.finishSuperstep(graphState, partitionStatsList);
numVertices = finishedSuperstepStats.getVertexCount();
numEdges = finishedSuperstepStats.getEdgeCount();
- superstepTimerContext.stop();
+
+ superstepMs.set(Times.getMsSince(TIME, superstepBeginMs));
+ if (conf.printSuperstepMetrics()) {
+ GiraphMetrics.getInstance().perSuperstep().printSummary();
+ }
+
} while (!finishedSuperstepStats.getAllVerticesHalted());
if (LOG.isInfoEnabled()) {
LOG.info("map: BSP application done (global vertices marked done)");
@@ -633,7 +674,9 @@ public class GraphMapper<I extends Writa
serviceWorker.getWorkerContext().setGraphState(
new GraphState<I, V, E, M>(serviceWorker.getSuperstep(),
numVertices, numEdges, context, this, null, aggregatorUsage));
+ long postAppBeginMs = TIME.getMilliseconds();
serviceWorker.getWorkerContext().postApplication();
+ wcPostAppMs.set(Times.getMsSince(TIME, postAppBeginMs));
context.progress();
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java Wed Nov 7 18:13:10 2012
@@ -23,6 +23,7 @@ import org.apache.giraph.comm.WorkerClie
import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
import org.apache.giraph.utils.SystemTime;
import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -55,9 +56,12 @@ import java.util.concurrent.Callable;
public abstract class InputSplitsCallable<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
implements Callable<VertexEdgeCount> {
+ /** Name of counter for vertices loaded */
+ public static final String COUNTER_VERTICES_LOADED = "vertices-loaded";
+ /** Name of counter for edges loaded */
+ public static final String COUNTER_EDGES_LOADED = "edges-loaded";
/** Class logger */
- private static final Logger LOG =
- Logger.getLogger(InputSplitsCallable.class);
+ private static final Logger LOG = Logger.getLogger(InputSplitsCallable.class);
/** Class time object */
private static final Time TIME = SystemTime.getInstance();
/** Configuration */
@@ -180,7 +184,7 @@ public abstract class InputSplitsCallabl
}
if (LOG.isInfoEnabled()) {
- float seconds = TIME.getNanosecondsSince(startNanos) /
+ float seconds = Times.getNanosSince(TIME, startNanos) /
Time.NS_PER_SECOND_AS_FLOAT;
float verticesPerSecond = vertexEdgeCount.getVertexCount() / seconds;
float edgesPerSecond = vertexEdgeCount.getEdgeCount() / seconds;
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java Wed Nov 7 18:13:10 2012
@@ -22,6 +22,7 @@ import org.apache.giraph.GiraphConfigura
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.SuperstepState;
+import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper.Context;
@@ -110,6 +111,7 @@ public class MasterThread<I extends Writ
while (superstepState != SuperstepState.ALL_SUPERSTEPS_DONE) {
long startSuperstepMillis = System.currentTimeMillis();
cachedSuperstep = bspServiceMaster.getSuperstep();
+ GiraphMetrics.getInstance().resetSuperstepMetrics(cachedSuperstep);
superstepState = bspServiceMaster.coordinateSuperstep();
long superstepMillis = System.currentTimeMillis() -
startSuperstepMillis;
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java Wed Nov 7 18:13:10 2012
@@ -21,6 +21,7 @@ package org.apache.giraph.graph;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphMetricsRegistry;
import org.apache.giraph.metrics.MetricGroup;
import org.apache.giraph.utils.LoggerUtils;
import org.apache.giraph.utils.MemoryUtils;
@@ -97,10 +98,11 @@ public class VertexInputSplitsCallable<I
this.bspServiceWorker = bspServiceWorker;
// Initialize Metrics
- verticesLoadedCounter = GiraphMetrics.getCounter(MetricGroup.IO,
- "vertices-loaded");
- edgesLoadedCounter = GiraphMetrics.getCounter(MetricGroup.IO,
- "edges-loaded");
+ GiraphMetricsRegistry jobMetrics = GiraphMetrics.getInstance().perJob();
+ verticesLoadedCounter = jobMetrics.getCounter(MetricGroup.IO,
+ COUNTER_VERTICES_LOADED);
+ edgesLoadedCounter = jobMetrics.getCounter(MetricGroup.IO,
+ COUNTER_EDGES_LOADED);
}
/**
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java Wed Nov 7 18:13:10 2012
@@ -17,224 +17,127 @@
*/
package org.apache.giraph.metrics;
+import org.apache.giraph.graph.BspService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Mapper;
-import com.google.common.base.Joiner;
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.Meter;
-import com.yammer.metrics.core.MetricName;
-import com.yammer.metrics.core.MetricPredicate;
-import com.yammer.metrics.core.MetricsRegistry;
-import com.yammer.metrics.core.Timer;
-import com.yammer.metrics.reporting.ConsoleReporter;
-import com.yammer.metrics.reporting.JmxReporter;
+import com.google.common.collect.Lists;
import java.io.PrintStream;
-import java.util.concurrent.TimeUnit;
+import java.util.List;
/**
- * Wrapper around {@link MetricsRegistry} to register metrics within a Giraph
- * job. After initializing, users can add {@link com.yammer.metrics.core.Metric}
- * and have it automatically piped into the configured outputs.
+ * Top level metrics class for using Yammer's metrics in Giraph.
*/
public class GiraphMetrics {
- /** Enable the Metrics system **/
- public static final String ENABLE = "giraph.metrics.enable";
+ /** Singleton instance for everyone to use */
+ private static GiraphMetrics INSTANCE = new GiraphMetrics();
- /** Enable the metrics on the console **/
- public static final String CONSOLE_ENABLE = "giraph.metrics.console.enable";
+ /** registry for per-superstep metrics */
+ private final SuperstepMetricsRegistry perSuperstep;
- /** Time period for metrics **/
- public static final String CONSOLE_PERIOD = "giraph.metrics.console.period";
+ /** registry for per-job metrics */
+ private final GiraphMetricsRegistry perJob;
- /** @{link TimeUnit} for metrics time period **/
- public static final String CONSOLE_TIME_UNIT =
- "giraph.metrics.console.time.unit";
-
- /** Whether to dump all metrics when the job finishes */
- public static final String DUMP_AT_END = "giraph.metrics.dump.at.end";
-
- /** Use the Job ID as the group of the metrics */
- private static String JOB_ID = "";
-
- /** Has the metrics system be initialized? **/
- private static boolean INITED = false;
-
- /** The registry of metrics **/
- private static MetricsRegistry REGISTRY = new EmptyMetricsRegistry();
- /** The reporter for JMX **/
- private static JmxReporter REPORTER;
-
- /** Well, this is a private constructor... **/
- private GiraphMetrics() { }
+ /** observer for per-superstep metrics re-initialization */
+ private final List<ResetSuperstepMetricsObserver> observers =
+ Lists.newArrayList();
/**
- * Initialize the GiraphMetrics
- *
- * @param context Mapper's context
+ * Initialize no-op registry that creates no-op metrics.
*/
- public static synchronized void init(Mapper.Context context) {
- if (INITED) {
- return;
- }
-
- Configuration conf = context.getConfiguration();
- if (conf.getBoolean(ENABLE, false)) {
- REGISTRY = new MetricsRegistry();
- REPORTER = new JmxReporter(REGISTRY);
- REPORTER.start();
- JOB_ID = context.getJobID().toString();
- initConsole(conf);
- }
-
- INITED = true;
+ private GiraphMetrics() {
+ perJob = new GiraphMetricsRegistry();
+ perSuperstep = new SuperstepMetricsRegistry();
}
/**
- * Initialize console printing
+ * Initialize GiraphMetrics with Hadoop Context
*
- * @param conf Configuration object used by this job
+ * @param context Hadoop Context to use.
*/
- private static void initConsole(Configuration conf) {
- if (conf.getBoolean(CONSOLE_ENABLE, false)) {
- String timeUnitString = conf.get(CONSOLE_TIME_UNIT, "SECONDS");
- TimeUnit timeUnit;
- try {
- timeUnit = TimeUnit.valueOf(timeUnitString.toUpperCase());
- } catch (IllegalArgumentException iae) {
- String values = Joiner.on(",").join(TimeUnit.values());
- throw new IllegalArgumentException("Unable to parse " + timeUnitString +
- " as value for " + CONSOLE_TIME_UNIT + ". Must be " +
- "one of: " + values);
- }
-
- int period = conf.getInt(CONSOLE_PERIOD, 90);
- ConsoleReporter.enable(REGISTRY, period, timeUnit);
- }
+ private GiraphMetrics(Mapper.Context context) {
+ Configuration conf = context.getConfiguration();
+ perJob = new GiraphMetricsRegistry(conf, "giraph.job");
+ perSuperstep = new SuperstepMetricsRegistry(conf,
+ BspService.INPUT_SUPERSTEP);
}
/**
- * Dump all metrics to output stream provided.
+ * Get singleton instance of GiraphMetrics.
*
- * @param out PrintStream to dump to.
+ * @return GiraphMetrics singleton instance
*/
- public static void dumpToStream(PrintStream out) {
- new ConsoleReporter(REGISTRY, out, MetricPredicate.ALL).run();
+ public static GiraphMetrics getInstance() {
+ return INSTANCE;
}
/**
- * Dump all metrics to stdout.
- */
- public static void dumpToStdout() {
- dumpToStream(System.out);
- }
-
- /**
- * Create a MetricName using the job ID, group, and name.
- * @param group what type of metric this is
- * @param name String name given to metric
- * @return MetricName for use with MetricsRegistry
- */
- private static MetricName makeMetricName(MetricGroup group, String name) {
- return new MetricName(JOB_ID, group.toString().toLowerCase(), name);
- }
-
- /**
- * Creates a new {@link Counter} and registers it under the given group
- * and name.
+ * Initialize singleton instance of GiraphMetrics.
*
- * @param group what type of metric this is
- * @param name the name of the metric
- * @return a new {@link Counter}
+ * @param context Hadoop Context to use.
*/
- public static Counter getCounter(MetricGroup group, String name) {
- return REGISTRY.newCounter(makeMetricName(group, name));
+ public static void init(Mapper.Context context) {
+ INSTANCE = new GiraphMetrics(context);
}
/**
- * Given a new {@link Gauge}, registers it under the given group and name.
+ * Get per-job metrics.
*
- * @param group what type of metric this is
- * @param name the name of the metric
- * @param metric the metric
- * @param <T> the type of the value returned by the metric
- * @return {@code metric}
+ * @return per-job GiraphMetricsRegistry
*/
- public static <T> Gauge<T> getGauge(MetricGroup group, String name,
- Gauge<T> metric) {
- return REGISTRY.newGauge(makeMetricName(group, name), metric);
+ public GiraphMetricsRegistry perJob() {
+ return perJob;
}
/**
- * Creates a new non-biased {@link Histogram} and registers it under the given
- * group and name.
+ * Get per-superstep metrics.
*
- * @param group what type of metric this is
- * @param name the name of the metric
- * @return a new {@link Histogram}
+ * @return per-superstep GiraphMetricsRegistry
*/
- public static Histogram getHistogram(MetricGroup group, String name) {
- return REGISTRY.newHistogram(makeMetricName(group, name), false);
+ public SuperstepMetricsRegistry perSuperstep() {
+ return perSuperstep;
}
/**
- * Creates a new {@link Histogram} and registers it under the given group
- * and name.
+ * Anyone using per-superstep counters needs to re-initialize their Metrics
+ * object on each new superstep. Otherwise they will always be updating just
+ * one counter. This method allows people to easily register a callback for
+ * when they should do the re-initializing.
*
- * @param group what type of metric this is
- * @param name the name of the metric
- * @param biased whether or not the histogram should be biased
- * @return a new {@link Histogram}
+ * @param observer SuperstepObserver to watch
*/
- public static Histogram getHistogram(MetricGroup group, String name,
- boolean biased) {
- return REGISTRY.newHistogram(makeMetricName(group, name), biased);
+ public void addSuperstepResetObserver(
+ ResetSuperstepMetricsObserver observer) {
+ observers.add(observer);
}
/**
- * Creates a new {@link Meter} and registers it under the given group
- * and name.
+ * Reset the per-superstep MetricsRegistry
*
- * @param group what type of metric this is
- * @param name the name of the metric
- * @param eventType the plural name of the type of events the meter is
- * measuring (e.g., {@code "requests"})
- * @param timeUnit the rate unit of the new meter
- * @return a new {@link Meter}
+ * @param superstep long number of superstep
*/
- public static Meter getMeter(MetricGroup group, String name, String eventType,
- TimeUnit timeUnit) {
- return REGISTRY.newMeter(makeMetricName(group, name), eventType, timeUnit);
+ public void resetSuperstepMetrics(long superstep) {
+ perSuperstep.setSuperstep(superstep);
+ for (ResetSuperstepMetricsObserver observer : observers) {
+ observer.newSuperstep(perSuperstep);
+ }
}
/**
- * Creates a new {@link Timer} and registers it under the given group and
- * name, measuring elapsed time in milliseconds and invocations per second.
+ * Dump all metrics to output stream provided.
*
- * @param group what type of metric this is
- * @param name the name of the metric
- * @return a new {@link Timer}
+ * @param out PrintStream to dump to.
*/
- public static Timer getTimer(MetricGroup group, String name) {
- return getTimer(group, name, TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
+ public void dumpToStream(PrintStream out) {
+ perJob.printToStream(out);
+ perSuperstep.printToStream(out);
}
/**
- * Creates a new {@link Timer} and registers it under the given
- * group and name.
- *
- * @param group what type of metric this is
- * @param name the name of the metric
- * @param durationUnit the duration scale unit of the new timer
- * @param rateUnit the rate scale unit of the new timer
- * @return a new {@link Timer}
+ * Dump all metrics to stdout.
*/
- public static Timer getTimer(MetricGroup group, String name,
- TimeUnit durationUnit, TimeUnit rateUnit) {
- return REGISTRY.newTimer(makeMetricName(group, name),
- durationUnit, rateUnit);
+ public void dumpToStdout() {
+ dumpToStream(System.out);
}
}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java?rev=1406744&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java Wed Nov 7 18:13:10 2012
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.metrics;
+
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.hadoop.conf.Configuration;
+
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricPredicate;
+import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.Timer;
+import com.yammer.metrics.reporting.ConsoleReporter;
+import com.yammer.metrics.reporting.JmxReporter;
+
+import java.io.PrintStream;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A holder for MetricsRegistry together with a JmxReporter.
+ */
+public class GiraphMetricsRegistry {
+ /** String name of group to use for metrics created */
+ private String groupName;
+ /** Internal Yammer registry used */
+ private final MetricsRegistry registry;
+ /** JmxReporter that send metrics to JMX */
+ private final JmxReporter jmxReporter;
+
+ /**
+ * Create no-op empty registry that makes no-op metrics.
+ */
+ public GiraphMetricsRegistry() {
+ registry = new EmptyMetricsRegistry();
+ jmxReporter = null;
+ }
+
+ /**
+ * Create registry with Hadoop Configuration and group to use for metrics.
+ *
+ * @param conf Hadoop Configuration to use.
+ * @param groupName String group to use for metrics.
+ */
+ public GiraphMetricsRegistry(Configuration conf, String groupName) {
+ this.groupName = groupName;
+ if (conf.getBoolean(GiraphConfiguration.METRICS_ENABLE, false)) {
+ registry = new MetricsRegistry();
+ jmxReporter = new JmxReporter(registry);
+ jmxReporter.start();
+ } else {
+ registry = new EmptyMetricsRegistry();
+ jmxReporter = null;
+ }
+ }
+
+ /**
+ * Set group name used by this MetricsRegistry. Used for incrementing
+ * superstep number to create a new hierarchy of metrics per superstep.
+ *
+ * @param groupName String group name to use.
+ */
+ protected void setGroupName(String groupName) {
+ this.groupName = groupName;
+ }
+
+ /**
+ * Dump all the metrics to the PrintStream provided.
+ *
+ * @param out PrintStream to write metrics to.
+ */
+ public void printToStream(PrintStream out) {
+ out.println("");
+ new ConsoleReporter(registry, out, MetricPredicate.ALL).run();
+ }
+
+ /**
+ * Get internal MetricsRegistry used.
+ *
+ * @return MetricsRegistry being used.
+ */
+ protected MetricsRegistry getInternalRegistry() {
+ return registry;
+ }
+
+ /**
+ * Creates a new {@link com.yammer.metrics.core.Counter} and registers it
+ * under the given group and name.
+ *
+ * @param group what type of metric this is
+ * @param name the name of the metric
+ * @return a new {@link com.yammer.metrics.core.Counter}
+ */
+ public Counter getCounter(MetricGroup group, String name) {
+ return registry.newCounter(makeMetricName(group, name));
+ }
+
+ /**
+ * Given a new {@link com.yammer.metrics.core.Gauge}, registers it under the
+ * given group and name.
+ *
+ * @param group what type of metric this is
+ * @param name the name of the metric
+ * @param metric the metric
+ * @param <T> the type of the value returned by the metric
+ * @return {@code metric}
+ */
+ public <T> Gauge<T> getGauge(MetricGroup group, String name,
+ Gauge<T> metric) {
+ return registry.newGauge(makeMetricName(group, name), metric);
+ }
+
+ /**
+ * Creates a new non-biased {@link com.yammer.metrics.core.Histogram} and
+ * registers it under the given group and name.
+ *
+ * @param group what type of metric this is
+ * @param name the name of the metric
+ * @return a new {@link com.yammer.metrics.core.Histogram}
+ */
+ public Histogram getHistogram(MetricGroup group, String name) {
+ return registry.newHistogram(makeMetricName(group, name), false);
+ }
+
+ /**
+ * Creates a new {@link Histogram} and registers it under the given group
+ * and name.
+ *
+ * @param group what type of metric this is
+ * @param name the name of the metric
+ * @param biased whether or not the histogram should be biased
+ * @return a new {@link Histogram}
+ */
+ public Histogram getHistogram(MetricGroup group, String name,
+ boolean biased) {
+ return registry.newHistogram(makeMetricName(group, name), biased);
+ }
+
+ /**
+ * Creates a new {@link com.yammer.metrics.core.Meter} and registers it under
+ * the given group and name.
+ *
+ * @param group what type of metric this is
+ * @param name the name of the metric
+ * @param eventType the plural name of the type of events the meter is
+ * measuring (e.g., {@code "requests"})
+ * @param timeUnit the rate unit of the new meter
+ * @return a new {@link com.yammer.metrics.core.Meter}
+ */
+ public Meter getMeter(MetricGroup group, String name, String eventType,
+ TimeUnit timeUnit) {
+ return registry.newMeter(makeMetricName(group, name), eventType, timeUnit);
+ }
+
+ /**
+ * Creates a new {@link com.yammer.metrics.core.Timer} and registers it under
+ * the given group and name, measuring elapsed time in milliseconds and
+ * invocations per second.
+ *
+ * @param group what type of metric this is
+ * @param name the name of the metric
+ * @return a new {@link com.yammer.metrics.core.Timer}
+ */
+ public Timer getTimer(MetricGroup group, String name) {
+ return getTimer(group, name, TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Creates a new {@link Timer} and registers it under the given
+ * group and name.
+ *
+ * @param group what type of metric this is
+ * @param name the name of the metric
+ * @param durationUnit the duration scale unit of the new timer
+ * @param rateUnit the rate scale unit of the new timer
+ * @return a new {@link Timer}
+ */
+ public Timer getTimer(MetricGroup group, String name,
+ TimeUnit durationUnit, TimeUnit rateUnit) {
+ return registry.newTimer(makeMetricName(group, name),
+ durationUnit, rateUnit);
+ }
+
+ /**
+ * Create a MetricName using the job ID, group, and name.
+ *
+ * @param group what type of metric this is
+ * @param name String name given to metric
+ * @return MetricName for use with MetricsRegistry
+ */
+ protected MetricName makeMetricName(MetricGroup group, String name) {
+ return new MetricName(groupName, group.toString().toLowerCase(), name);
+ }
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ResetSuperstepMetricsObserver.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ResetSuperstepMetricsObserver.java?rev=1406744&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ResetSuperstepMetricsObserver.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ResetSuperstepMetricsObserver.java Wed Nov 7 18:13:10 2012
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.metrics;
+
+/**
+ * Observer for users of per-superstep counters. Triggered with calls whenever
+ * we are starting a new superstep so that user can re-initialize their metrics.
+ */
+public interface ResetSuperstepMetricsObserver {
+ /**
+ * Starting a new superstep. Re-initialize your metrics.
+ *
+ * @param superstepMetrics SuperstepMetricsRegistry being used.
+ */
+ void newSuperstep(SuperstepMetricsRegistry superstepMetrics);
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java?rev=1406744&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java Wed Nov 7 18:13:10 2012
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.metrics;
+
+import org.apache.giraph.graph.BspService;
+import org.apache.giraph.graph.BspServiceWorker;
+import org.apache.giraph.graph.ComputeCallable;
+import org.apache.giraph.graph.GraphMapper;
+import org.apache.hadoop.conf.Configuration;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Metric;
+import com.yammer.metrics.core.Timer;
+
+import java.io.PrintStream;
+
+/**
+ * Wrapper around MetricsRegistry for per-superstep metrics.
+ */
+public class SuperstepMetricsRegistry extends GiraphMetricsRegistry {
+ /** Number of superstep to use for group of metrics created */
+ private long superstep = BspService.INPUT_SUPERSTEP;
+
+ /**
+ * Create no-op registry that creates no-op metrics.
+ */
+ public SuperstepMetricsRegistry() {
+ super();
+ }
+
+ /**
+ * Create with Hadoop Configuration and superstep number.
+ *
+ * @param conf Hadoop Configuration to use.
+ * @param superstep number of superstep to use as group for metrics.
+ */
+ public SuperstepMetricsRegistry(Configuration conf, long superstep) {
+ super(conf, makeSuperstepGroupName(superstep));
+ this.superstep = superstep;
+ }
+
+ /**
+ * Set superstep number used. Internally sets the group for metrics created.
+ *
+ * @param superstep long number of superstep to use.
+ */
+ public void setSuperstep(long superstep) {
+ super.setGroupName(makeSuperstepGroupName(superstep));
+ this.superstep = superstep;
+ }
+
+ /**
+ * Create group name to use for superstep.
+ *
+ * @param superstep long value of superstep to use.
+ * @return String group for superstep to use for metrics created.
+ */
+ private static String makeSuperstepGroupName(long superstep) {
+ return "giraph.superstep." + superstep;
+ }
+
+ /**
+ * Print human readable summary of superstep metrics.
+ *
+ * @param out PrintStream to write to.
+ */
+ public void printSummary(PrintStream out) {
+ Long commTime = getGaugeValue(MetricGroup.NETWORK,
+ GraphMapper.GAUGE_COMMUNICATION_TIME);
+ Long computeAllTime = getGaugeValue(MetricGroup.COMPUTE,
+ GraphMapper.GAUGE_COMPUTE_ALL);
+ Long timeToFirstMsg = getGaugeValue(MetricGroup.NETWORK,
+ GraphMapper.GAUGE_TIME_TO_FIRST_MSG);
+ Long superstepTime = getGaugeValue(MetricGroup.COMPUTE,
+ GraphMapper.GAUGE_SUPERSTEP_TIME);
+ Long waitingMs = getGaugeValue(MetricGroup.NETWORK,
+ BspServiceWorker.GAUGE_WAITING_TIME);
+ Timer computeOne = getTimer(MetricGroup.COMPUTE,
+ ComputeCallable.TIMER_COMPUTE_ONE);
+ double userComputeTime = computeOne.mean() * computeOne.count();
+
+ out.println("");
+ out.println("Superstep " + superstep + ":");
+ out.println(" superstep time: " + superstepTime + " ms");
+ out.println(" time to first message: " + timeToFirstMsg + " ms");
+ out.println(" compute time: " + computeAllTime + " ms");
+ out.println(" user compute time: " + userComputeTime + " ms");
+ out.println(" network communication time: " + commTime + " ms");
+ out.println(" waiting time: " + waitingMs + " ms");
+ }
+
+ /**
+ * Print human readable summary of superstep metrics.
+ */
+ public void printSummary() {
+ printSummary(System.out);
+ }
+
+ /**
+ * Get a Gauge that is already present in the MetricsRegistry
+ *
+ * @param group MetricGroup Gauge belongs to
+ * @param name String name of Gauge
+ * @param <T> value type Gauge returns
+ * @return Gauge<T> from MetricsRegistry
+ */
+ private <T> Gauge<T> getExistingGauge(MetricGroup group, String name) {
+ Metric metric = getInternalRegistry().allMetrics().
+ get(makeMetricName(group, name));
+ return metric instanceof Gauge ? (Gauge<T>) metric : null;
+ }
+
+ /**
+ * Get value of Gauge that is already present in the MetricsRegistry
+ *
+ * @param group MetricGroup Gauge belongs to
+ * @param name String name of Gauge
+ * @param <T> value type Gauge returns
+ * @return T value of Gauge<T> from MetricsRegistry
+ */
+ private <T> T getGaugeValue(MetricGroup group, String name) {
+ Gauge<T> gauge = getExistingGauge(group, name);
+ return gauge == null ? null : gauge.value();
+ }
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ValueGauge.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ValueGauge.java?rev=1406744&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ValueGauge.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ValueGauge.java Wed Nov 7 18:13:10 2012
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.metrics;
+
+import com.yammer.metrics.core.Gauge;
+
+/**
+ * A Gauge that holds a value.
+ *
+ * @param <T> type of value being held.
+ */
+public class ValueGauge<T extends Number> extends Gauge<T> {
+ /** value held by this class */
+ private T value;
+
+ /**
+ * Constructor that registers Gauge in MetricsRegistry.
+ *
+ * @param registry GiraphMetricsRegistry to use.
+ * @param group MetricGroup for Gauge.
+ * @param name String name of Gauge.
+ */
+ public ValueGauge(GiraphMetricsRegistry registry, MetricGroup group,
+ String name) {
+ registry.getGauge(group, name, this);
+ }
+
+ @Override
+ public T value() {
+ return value;
+ }
+
+ /**
+ * Get double representation of value held.
+ *
+ * @return double value
+ */
+ public double getDouble() {
+ return value.doubleValue();
+ }
+
+ /**
+ * Get long representation of value held.
+ *
+ * @return long value
+ */
+ public long getLong() {
+ return value.longValue();
+ }
+
+ /**
+ * Set value held by this object.
+ *
+ * @param value value to set.
+ */
+ public void set(T value) {
+ this.value = value;
+ }
+}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FakeTime.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FakeTime.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FakeTime.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FakeTime.java Wed Nov 7 18:13:10 2012
@@ -35,31 +35,16 @@ public class FakeTime implements Time {
}
@Override
- public long getMillisecondsSince(long previousMilliseconds) {
- return getMilliseconds() - previousMilliseconds;
- }
-
- @Override
public long getNanoseconds() {
return nanosecondsSinceEpoch.get();
}
@Override
- public long getNanosecondsSince(long previousNanoseconds) {
- return getNanoseconds() - previousNanoseconds;
- }
-
- @Override
public int getSeconds() {
return (int) (nanosecondsSinceEpoch.get() / NS_PER_SECOND);
}
@Override
- public int getSecondsSince(int previousSeconds) {
- return getSeconds() - previousSeconds;
- }
-
- @Override
public Date getCurrentDate() {
return new Date(getMilliseconds());
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/MemoryUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/MemoryUtils.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/MemoryUtils.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/MemoryUtils.java Wed Nov 7 18:13:10 2012
@@ -20,6 +20,7 @@ package org.apache.giraph.utils;
import com.yammer.metrics.util.PercentGauge;
import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphMetricsRegistry;
import org.apache.giraph.metrics.MetricGroup;
/**
@@ -66,8 +67,9 @@ public class MemoryUtils {
* Initialize metrics tracked by this helper.
*/
public static void initMetrics() {
- GiraphMetrics.getGauge(MetricGroup.SYSTEM, "memory-free-pct",
- new PercentGauge() {
+ GiraphMetricsRegistry metrics = GiraphMetrics.getInstance().perJob();
+ metrics.getGauge(MetricGroup.SYSTEM, "memory-free-pct",
+ new PercentGauge() {
@Override
protected double getNumerator() {
return freeMemoryMB();
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/SystemTime.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/SystemTime.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/SystemTime.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/SystemTime.java Wed Nov 7 18:13:10 2012
@@ -36,32 +36,16 @@ public class SystemTime implements Time
}
@Override
- public long getMillisecondsSince(long previousMilliseconds) {
- return getMilliseconds() - previousMilliseconds;
- }
-
-
- @Override
public long getNanoseconds() {
return System.nanoTime();
}
@Override
- public long getNanosecondsSince(long previousNanoseconds) {
- return getNanoseconds() - previousNanoseconds;
- }
-
- @Override
public int getSeconds() {
return (int) (getMilliseconds() / MS_PER_SECOND);
}
@Override
- public int getSecondsSince(int previousSeconds) {
- return getSeconds() - previousSeconds;
- }
-
- @Override
public Date getCurrentDate() {
return new Date();
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java?rev=1406744&r1=1406743&r2=1406744&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java Wed Nov 7 18:13:10 2012
@@ -62,15 +62,6 @@ public interface Time {
long getMilliseconds();
/**
- * Convenience method to get milliseconds since a previous milliseconds
- * point.
- *
- * @param previousMilliseconds Previous milliseconds
- * @return Milliseconds elapsed since the previous milliseconds
- */
- long getMillisecondsSince(long previousMilliseconds);
-
- /**
* Get the current nanoseconds
*
* @return The difference, measured in nanoseconds, between
@@ -79,15 +70,6 @@ public interface Time {
long getNanoseconds();
/**
- * Convenience method to get nanoseconds since a previous nanoseconds
- * point.
- *
- * @param previousNanoseconds Previous nanoseconds
- * @return Nanoseconds elapsed since the previous nanoseconds
- */
- long getNanosecondsSince(long previousNanoseconds);
-
- /**
* Get the current seconds
*
* @return The difference, measured in seconds, between
@@ -96,15 +78,6 @@ public interface Time {
int getSeconds();
/**
- * Convenience method to get seconds since a previous seconds
- * point.
- *
- * @param previousSeconds Previous seconds
- * @return Seconds elapsed since the previous seconds
- */
- int getSecondsSince(int previousSeconds);
-
- /**
* Get the current date
*
* @return Current date