You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/03/08 20:59:37 UTC
svn commit: r1298541 - in /incubator/hama/trunk/core/src:
main/java/org/apache/hama/bsp/ main/java/org/apache/hama/bsp/message/
test/java/org/apache/hama/bsp/message/
Author: tjungblut
Date: Thu Mar 8 19:59:36 2012
New Revision: 1298541
URL: http://svn.apache.org/viewvc?rev=1298541&view=rev
Log:
[HAMA-485]: Fill Counters with useful information
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1298541&r1=1298540&r2=1298541&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Thu Mar 8 19:59:36 2012
@@ -161,7 +161,7 @@ public interface BSPPeer<K1, V1, K2, V2,
* @param amount A non-negative amount by which the counter is to
* be incremented.
*/
- public void incrCounter(Enum<?> key, long amount);
+ public void incrementCounter(Enum<?> key, long amount);
/**
* Increments the counter identified by the group and counter name
@@ -172,5 +172,5 @@ public interface BSPPeer<K1, V1, K2, V2,
* @param amount A non-negative amount by which the counter is to
* be incremented.
*/
- public void incrCounter(String group, String counter, long amount);
+ public void incrementCounter(String group, String counter, long amount);
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1298541&r1=1298540&r2=1298541&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Mar 8 19:59:36 2012
@@ -52,7 +52,10 @@ public final class BSPPeerImpl<K1, V1, K
private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
public static enum PeerCounter {
- SUPERSTEP_SUM, SUPERSTEPS
+ SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS,
+ IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED,
+ TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT,
+ COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
}
private final Configuration conf;
@@ -83,7 +86,9 @@ public final class BSPPeerImpl<K1, V1, K
private RecordWriter<K2, V2> outWriter;
private InetSocketAddress peerAddress;
+
private Counters counters;
+ private Combiner<M> combiner;
/**
* Protected default constructor for LocalBSPRunner.
@@ -105,6 +110,18 @@ public final class BSPPeerImpl<K1, V1, K
}
/**
+ * For unit test.
+ *
+ * @param conf is the configuration file.
+ * @param dfs is the Hadoop FileSystem.
+ * @param counters is the counters from outside.
+ */
+ public BSPPeerImpl(final Configuration conf, FileSystem dfs, Counters counters) {
+ this(conf, dfs);
+ this.counters = counters;
+ }
+
+ /**
* BSPPeer Constructor.
*
* BSPPeer acts on behalf of clients performing bsp() tasks.
@@ -114,6 +131,7 @@ public final class BSPPeerImpl<K1, V1, K
* @param taskId is the id that current process holds.
* @throws Exception
*/
+ @SuppressWarnings("unchecked")
public BSPPeerImpl(BSPJob job, Configuration conf, TaskAttemptID taskId,
BSPPeerProtocol umbilical, int partition, String splitClass,
BytesWritable split, Counters counters) throws Exception {
@@ -150,7 +168,13 @@ public final class BSPPeerImpl<K1, V1, K
TaskStatus.Phase.STARTING, counters));
messenger = MessageManagerFactory.getMessageManager(conf);
- messenger.init(conf, peerAddress);
+ messenger.init(this, conf, peerAddress);
+
+ final String combinerName = conf.get("bsp.combiner.class");
+ if (combinerName != null) {
+ combiner = (Combiner<M>) ReflectionUtils.newInstance(
+ conf.getClassByName(combinerName), conf);
+ }
}
@@ -200,7 +224,10 @@ public final class BSPPeerImpl<K1, V1, K
if (in != null) {
in.close();
}
- in = bspJob.getInputFormat().getRecordReader(inputSplit, bspJob);
+ in = new TrackedRecordReader<K1, V1>(bspJob.getInputFormat()
+ .getRecordReader(inputSplit, bspJob),
+ getCounter(BSPPeerImpl.PeerCounter.TASK_INPUT_RECORDS),
+ getCounter(BSPPeerImpl.PeerCounter.IO_BYTES_READ));
}
}
@@ -211,6 +238,7 @@ public final class BSPPeerImpl<K1, V1, K
@Override
public final void send(String peerName, M msg) throws IOException {
+ incrementCounter(PeerCounter.TOTAL_MESSAGES_SENT, 1L);
messenger.send(peerName, msg);
}
@@ -266,6 +294,7 @@ public final class BSPPeerImpl<K1, V1, K
@Override
public final void sync() throws IOException, SyncException,
InterruptedException {
+ long startBarrier = System.currentTimeMillis();
enterBarrier();
Iterator<Entry<InetSocketAddress, LinkedList<M>>> it = messenger
.getMessageIterator();
@@ -295,7 +324,9 @@ public final class BSPPeerImpl<K1, V1, K
leaveBarrier();
- incrCounter(PeerCounter.SUPERSTEP_SUM, 1L);
+ incrementCounter(PeerCounter.TIME_IN_SYNC_MS,
+ (System.currentTimeMillis() - startBarrier));
+ incrementCounter(PeerCounter.SUPERSTEP_SUM, 1L);
currentTaskStatus.setCounters(counters);
@@ -305,23 +336,15 @@ public final class BSPPeerImpl<K1, V1, K
}
private final BSPMessageBundle<M> combineMessages(Iterable<M> messages) {
- if (!conf.getClass("bsp.combiner.class", Combiner.class).equals(
- Combiner.class)) {
- @SuppressWarnings("unchecked")
- Combiner<M> combiner = (Combiner<M>) ReflectionUtils.newInstance(
- conf.getClass("bsp.combiner.class", Combiner.class), conf);
-
- BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+ BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+ if (combiner != null) {
bundle.addMessage(combiner.combine(messages));
-
- return bundle;
} else {
- BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
for (M message : messages) {
bundle.addMessage(message);
}
- return bundle;
}
+ return bundle;
}
protected final void enterBarrier() throws SyncException {
@@ -423,6 +446,7 @@ public final class BSPPeerImpl<K1, V1, K
@Override
public final void write(K2 key, V2 value) throws IOException {
+ incrementCounter(PeerCounter.TASK_OUTPUT_RECORDS, 1);
collector.collect(key, value);
}
@@ -461,21 +485,22 @@ public final class BSPPeerImpl<K1, V1, K
return counter;
}
+ public Counters getCounters() {
+ return counters;
+ }
+
@Override
- public final void incrCounter(Enum<?> key, long amount) {
+ public final void incrementCounter(Enum<?> key, long amount) {
if (counters != null) {
counters.incrCounter(key, amount);
}
}
@Override
- public final void incrCounter(String group, String counter, long amount) {
+ public final void incrementCounter(String group, String counter, long amount) {
if (counters != null) {
counters.incrCounter(group, counter, amount);
}
}
- public Counters getCounters() {
- return counters;
- }
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1298541&r1=1298540&r2=1298541&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Thu Mar 8 19:59:36 2012
@@ -336,8 +336,11 @@ public class LocalBSPRunner implements J
private static final ConcurrentHashMap<String, InetSocketAddress> socketCache = new ConcurrentHashMap<String, InetSocketAddress>();
private final LinkedBlockingDeque<M> localIncomingMessages = new LinkedBlockingDeque<M>();
+ private BSPPeer<?, ?, ?, ?, M> peer;
+
@Override
- public void init(Configuration conf, InetSocketAddress peerAddress) {
+ public void init(BSPPeer<?, ?, ?, ?, M> peer, Configuration conf, InetSocketAddress peerAddress) {
+ this.peer = peer;
managerMap.put(peerAddress, this);
}
@@ -367,7 +370,7 @@ public class LocalBSPRunner implements J
msgs = new LinkedList<M>();
}
msgs.add(msg);
-
+ peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
localOutgoingMessages.put(inetSocketAddress, msgs);
}
@@ -377,6 +380,7 @@ public class LocalBSPRunner implements J
throws IOException {
for (M value : bundle.getMessages()) {
managerMap.get(addr).localIncomingMessages.add(value);
+ peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
}
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java?rev=1298541&r1=1298540&r2=1298541&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java Thu Mar 8 19:59:36 2012
@@ -39,10 +39,12 @@ import org.apache.avro.ipc.specific.Spec
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
import org.apache.hama.util.BSPNetUtils;
-public class AvroMessageManagerImpl<M extends Writable> extends
+public final class AvroMessageManagerImpl<M extends Writable> extends
CompressableMessageManager<M> implements Sender<M> {
private NettyServer server = null;
@@ -55,8 +57,12 @@ public class AvroMessageManagerImpl<M ex
// this must be a synchronized implementation: this is accessed per RPC
private final ConcurrentLinkedQueue<M> localQueueForNextIteration = new ConcurrentLinkedQueue<M>();
+ private BSPPeer<?, ?, ?, ?, M> peer;
+
@Override
- public void init(Configuration conf, InetSocketAddress addr) {
+ public void init(BSPPeer<?, ?, ?, ?, M> peer, Configuration conf,
+ InetSocketAddress addr) {
+ this.peer = peer;
super.initCompression(conf);
server = new NettyServer(new SpecificResponder(Sender.class, this), addr);
}
@@ -74,8 +80,13 @@ public class AvroMessageManagerImpl<M ex
}
public void put(BSPMessageBundle<M> messages) {
- for (M message : messages.getMessages()) {
- this.localQueueForNextIteration.add(message);
+ peer.incrementCounter(
+ BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, messages.getMessages()
+ .size());
+ Iterator<M> iterator = messages.getMessages().iterator();
+ while (iterator.hasNext()) {
+ this.localQueueForNextIteration.add(iterator.next());
+ iterator.remove();
}
}
@@ -140,13 +151,17 @@ public class AvroMessageManagerImpl<M ex
private final BSPMessageBundle<M> deserializeMessage(ByteBuffer buffer)
throws IOException {
BSPMessageBundle<M> msg = new BSPMessageBundle<M>();
+ byte[] byteArray = buffer.array();
if (compressor == null) {
- ByteArrayInputStream inArray = new ByteArrayInputStream(buffer.array());
+ peer.incrementCounter(
+ BSPPeerImpl.PeerCounter.MESSAGE_BYTES_RECEIVED, byteArray.length);
+ ByteArrayInputStream inArray = new ByteArrayInputStream(byteArray);
DataInputStream in = new DataInputStream(inArray);
msg.readFields(in);
} else {
- msg = compressor
- .decompressBundle(new BSPCompressedBundle(buffer.array()));
+ peer.incrementCounter(
+ BSPPeerImpl.PeerCounter.COMPRESSED_BYTES_RECEIVED, byteArray.length);
+ msg = compressor.decompressBundle(new BSPCompressedBundle(byteArray));
}
return msg;
@@ -159,10 +174,16 @@ public class AvroMessageManagerImpl<M ex
DataOutputStream out = new DataOutputStream(outArray);
msg.write(out);
out.close();
- return ByteBuffer.wrap(outArray.toByteArray());
+ byte[] byteArray = outArray.toByteArray();
+ peer.incrementCounter(
+ BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED, byteArray.length);
+ return ByteBuffer.wrap(byteArray);
} else {
BSPCompressedBundle compMsgBundle = compressor.compressBundle(msg);
- return ByteBuffer.wrap(compMsgBundle.getData());
+ byte[] data = compMsgBundle.getData();
+ peer.incrementCounter(
+ BSPPeerImpl.PeerCounter.COMPRESSED_BYTES_SENT, data.length);
+ return ByteBuffer.wrap(data);
}
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1298541&r1=1298540&r2=1298541&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java Thu Mar 8 19:59:36 2012
@@ -33,6 +33,8 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
import org.apache.hama.util.BSPNetUtils;
import org.apache.hama.util.CompressionUtil;
@@ -57,9 +59,11 @@ public final class HadoopMessageManagerI
private Deque<M> localQueue = new LinkedList<M>();
// this must be a synchronized implementation: this is accessed per RPC
private final ConcurrentLinkedQueue<M> localQueueForNextIteration = new ConcurrentLinkedQueue<M>();
+ private BSPPeer<?, ?, ?, ?, M> peer;
@Override
- public final void init(Configuration conf, InetSocketAddress peerAddress) {
+ public final void init(BSPPeer<?, ?, ?, ?, M> peer, Configuration conf, InetSocketAddress peerAddress) {
+ this.peer = peer;
this.conf = conf;
super.initCompression(conf);
startRPCServer(conf, peerAddress);
@@ -107,6 +111,7 @@ public final class HadoopMessageManagerI
queue = new LinkedList<M>();
}
queue.add(msg);
+ peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
outgoingQueues.put(targetPeerAddress, queue);
}
@@ -160,6 +165,8 @@ public final class HadoopMessageManagerI
@Override
public final void put(M msg) {
this.localQueueForNextIteration.add(msg);
+ peer.incrementCounter(
+ BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
}
@Override
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1298541&r1=1298540&r2=1298541&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java Thu Mar 8 19:59:36 2012
@@ -26,6 +26,7 @@ import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer;
/**
* This manager takes care of the messaging. It is responsible to launch a
@@ -40,7 +41,8 @@ public interface MessageManager<M extend
* @param conf
* @param peerAddress
*/
- public void init(Configuration conf, InetSocketAddress peerAddress);
+ public void init(BSPPeer<?, ?, ?, ?, M> peer, Configuration conf,
+ InetSocketAddress peerAddress);
/**
* Close is called after a task ran. Should be used to cleanup things e.G.
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java?rev=1298541&r1=1298540&r2=1298541&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java Thu Mar 8 19:59:36 2012
@@ -23,8 +23,15 @@ import java.util.Random;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.BSPPeerImpl;
+import org.apache.hama.bsp.Counters;
+import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
+import org.apache.hama.bsp.message.compress.SnappyCompressor;
import org.apache.hama.bsp.messages.BooleanMessage;
import org.apache.hama.bsp.messages.DoubleMessage;
import org.apache.hama.bsp.messages.IntegerMessage;
@@ -44,12 +51,18 @@ public class TestAvroMessageManager exte
Configuration conf = new Configuration();
MessageManager<Writable> messageManager = MessageManagerFactory
.getMessageManager(conf);
+ conf.set(BSPMessageCompressorFactory.COMPRESSION_CODEC_CLASS,
+ SnappyCompressor.class.getName());
assertTrue(messageManager instanceof AvroMessageManagerImpl);
InetSocketAddress peer = new InetSocketAddress(
BSPNetUtils.getCanonicalHostname(), BSPNetUtils.getFreePort());
- messageManager.init(conf, peer);
+
+ BSPPeer<?, ?, ?, ?, Writable> dummyPeer = new BSPPeerImpl<NullWritable, NullWritable, NullWritable, NullWritable, Writable>(
+ conf, FileSystem.get(conf), new Counters());
+
+ messageManager.init(dummyPeer, conf, peer);
messageManager.transfer(peer, randomBundle);
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java?rev=1298541&r1=1298540&r2=1298541&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java Thu Mar 8 19:59:36 2012
@@ -25,8 +25,13 @@ import java.util.Map.Entry;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.BSPPeerImpl;
+import org.apache.hama.bsp.Counters;
import org.apache.hama.util.BSPNetUtils;
public class TestHadoopMessageManager extends TestCase {
@@ -42,7 +47,9 @@ public class TestHadoopMessageManager ex
InetSocketAddress peer = new InetSocketAddress(
BSPNetUtils.getCanonicalHostname(), BSPNetUtils.getFreePort());
- messageManager.init(conf, peer);
+ BSPPeer<?, ?, ?, ?, IntWritable> dummyPeer = new BSPPeerImpl<NullWritable, NullWritable, NullWritable, NullWritable, IntWritable>(
+ conf, FileSystem.get(conf), new Counters());
+ messageManager.init(dummyPeer, conf, peer);
String peerName = peer.getHostName() + ":" + peer.getPort();
messageManager.send(peerName, new IntWritable(1337));