You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/03/08 20:59:37 UTC

svn commit: r1298541 - in /incubator/hama/trunk/core/src: main/java/org/apache/hama/bsp/ main/java/org/apache/hama/bsp/message/ test/java/org/apache/hama/bsp/message/

Author: tjungblut
Date: Thu Mar  8 19:59:36 2012
New Revision: 1298541

URL: http://svn.apache.org/viewvc?rev=1298541&view=rev
Log:
[HAMA-485]: Fill Counters with useful information

Modified:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1298541&r1=1298540&r2=1298541&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Thu Mar  8 19:59:36 2012
@@ -161,7 +161,7 @@ public interface BSPPeer<K1, V1, K2, V2,
    * @param amount A non-negative amount by which the counter is to 
    *               be incremented.
    */
-  public void incrCounter(Enum<?> key, long amount);
+  public void incrementCounter(Enum<?> key, long amount);
   
   /**
    * Increments the counter identified by the group and counter name
@@ -172,5 +172,5 @@ public interface BSPPeer<K1, V1, K2, V2,
    * @param amount A non-negative amount by which the counter is to 
    *               be incremented.
    */
-  public void incrCounter(String group, String counter, long amount);
+  public void incrementCounter(String group, String counter, long amount);
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1298541&r1=1298540&r2=1298541&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Mar  8 19:59:36 2012
@@ -52,7 +52,10 @@ public final class BSPPeerImpl<K1, V1, K
   private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
 
   public static enum PeerCounter {
-    SUPERSTEP_SUM, SUPERSTEPS
+    SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS,
+    IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED,
+    TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT,
+    COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
   }
 
   private final Configuration conf;
@@ -83,7 +86,9 @@ public final class BSPPeerImpl<K1, V1, K
   private RecordWriter<K2, V2> outWriter;
 
   private InetSocketAddress peerAddress;
+
   private Counters counters;
+  private Combiner<M> combiner;
 
   /**
    * Protected default constructor for LocalBSPRunner.
@@ -105,6 +110,18 @@ public final class BSPPeerImpl<K1, V1, K
   }
 
   /**
+   * For unit test.
+   * 
+   * @param conf is the configuration file.
+   * @param dfs is the Hadoop FileSystem.
+   * @param counters is the counters from outside.
+   */
+  public BSPPeerImpl(final Configuration conf, FileSystem dfs, Counters counters) {
+    this(conf, dfs);
+    this.counters = counters;
+  }
+
+  /**
    * BSPPeer Constructor.
    * 
    * BSPPeer acts on behalf of clients performing bsp() tasks.
@@ -114,6 +131,7 @@ public final class BSPPeerImpl<K1, V1, K
    * @param taskId is the id that current process holds.
    * @throws Exception
    */
+  @SuppressWarnings("unchecked")
   public BSPPeerImpl(BSPJob job, Configuration conf, TaskAttemptID taskId,
       BSPPeerProtocol umbilical, int partition, String splitClass,
       BytesWritable split, Counters counters) throws Exception {
@@ -150,7 +168,13 @@ public final class BSPPeerImpl<K1, V1, K
         TaskStatus.Phase.STARTING, counters));
 
     messenger = MessageManagerFactory.getMessageManager(conf);
-    messenger.init(conf, peerAddress);
+    messenger.init(this, conf, peerAddress);
+
+    final String combinerName = conf.get("bsp.combiner.class");
+    if (combinerName != null) {
+      combiner = (Combiner<M>) ReflectionUtils.newInstance(
+          conf.getClassByName(combinerName), conf);
+    }
 
   }
 
@@ -200,7 +224,10 @@ public final class BSPPeerImpl<K1, V1, K
       if (in != null) {
         in.close();
       }
-      in = bspJob.getInputFormat().getRecordReader(inputSplit, bspJob);
+      in = new TrackedRecordReader<K1, V1>(bspJob.getInputFormat()
+          .getRecordReader(inputSplit, bspJob),
+          getCounter(BSPPeerImpl.PeerCounter.TASK_INPUT_RECORDS),
+          getCounter(BSPPeerImpl.PeerCounter.IO_BYTES_READ));
     }
   }
 
@@ -211,6 +238,7 @@ public final class BSPPeerImpl<K1, V1, K
 
   @Override
   public final void send(String peerName, M msg) throws IOException {
+    incrementCounter(PeerCounter.TOTAL_MESSAGES_SENT, 1L);
     messenger.send(peerName, msg);
   }
 
@@ -266,6 +294,7 @@ public final class BSPPeerImpl<K1, V1, K
   @Override
   public final void sync() throws IOException, SyncException,
       InterruptedException {
+    long startBarrier = System.currentTimeMillis();
     enterBarrier();
     Iterator<Entry<InetSocketAddress, LinkedList<M>>> it = messenger
         .getMessageIterator();
@@ -295,7 +324,9 @@ public final class BSPPeerImpl<K1, V1, K
 
     leaveBarrier();
 
-    incrCounter(PeerCounter.SUPERSTEP_SUM, 1L);
+    incrementCounter(PeerCounter.TIME_IN_SYNC_MS,
+        (System.currentTimeMillis() - startBarrier));
+    incrementCounter(PeerCounter.SUPERSTEP_SUM, 1L);
 
     currentTaskStatus.setCounters(counters);
 
@@ -305,23 +336,15 @@ public final class BSPPeerImpl<K1, V1, K
   }
 
   private final BSPMessageBundle<M> combineMessages(Iterable<M> messages) {
-    if (!conf.getClass("bsp.combiner.class", Combiner.class).equals(
-        Combiner.class)) {
-      @SuppressWarnings("unchecked")
-      Combiner<M> combiner = (Combiner<M>) ReflectionUtils.newInstance(
-          conf.getClass("bsp.combiner.class", Combiner.class), conf);
-
-      BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+    BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+    if (combiner != null) {
       bundle.addMessage(combiner.combine(messages));
-      
-      return bundle;
     } else {
-      BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
       for (M message : messages) {
         bundle.addMessage(message);
       }
-      return bundle;
     }
+    return bundle;
   }
 
   protected final void enterBarrier() throws SyncException {
@@ -423,6 +446,7 @@ public final class BSPPeerImpl<K1, V1, K
 
   @Override
   public final void write(K2 key, V2 value) throws IOException {
+    incrementCounter(PeerCounter.TASK_OUTPUT_RECORDS, 1);
     collector.collect(key, value);
   }
 
@@ -461,21 +485,22 @@ public final class BSPPeerImpl<K1, V1, K
     return counter;
   }
 
+  public Counters getCounters() {
+    return counters;
+  }
+
   @Override
-  public final void incrCounter(Enum<?> key, long amount) {
+  public final void incrementCounter(Enum<?> key, long amount) {
     if (counters != null) {
       counters.incrCounter(key, amount);
     }
   }
 
   @Override
-  public final void incrCounter(String group, String counter, long amount) {
+  public final void incrementCounter(String group, String counter, long amount) {
     if (counters != null) {
       counters.incrCounter(group, counter, amount);
     }
   }
 
-  public Counters getCounters() {
-    return counters;
-  }
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1298541&r1=1298540&r2=1298541&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Thu Mar  8 19:59:36 2012
@@ -336,8 +336,11 @@ public class LocalBSPRunner implements J
     private static final ConcurrentHashMap<String, InetSocketAddress> socketCache = new ConcurrentHashMap<String, InetSocketAddress>();
     private final LinkedBlockingDeque<M> localIncomingMessages = new LinkedBlockingDeque<M>();
 
+    private BSPPeer<?, ?, ?, ?, M> peer;
+
     @Override
-    public void init(Configuration conf, InetSocketAddress peerAddress) {
+    public void init(BSPPeer<?, ?, ?, ?, M> peer, Configuration conf, InetSocketAddress peerAddress) {
+      this.peer = peer;
       managerMap.put(peerAddress, this);
     }
 
@@ -367,7 +370,7 @@ public class LocalBSPRunner implements J
         msgs = new LinkedList<M>();
       }
       msgs.add(msg);
-
+      peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
       localOutgoingMessages.put(inetSocketAddress, msgs);
     }
 
@@ -377,6 +380,7 @@ public class LocalBSPRunner implements J
         throws IOException {
       for (M value : bundle.getMessages()) {
         managerMap.get(addr).localIncomingMessages.add(value);
+        peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
       }
     }
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java?rev=1298541&r1=1298540&r2=1298541&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java Thu Mar  8 19:59:36 2012
@@ -39,10 +39,12 @@ import org.apache.avro.ipc.specific.Spec
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
 import org.apache.hama.util.BSPNetUtils;
 
-public class AvroMessageManagerImpl<M extends Writable> extends
+public final class AvroMessageManagerImpl<M extends Writable> extends
     CompressableMessageManager<M> implements Sender<M> {
 
   private NettyServer server = null;
@@ -55,8 +57,12 @@ public class AvroMessageManagerImpl<M ex
   // this must be a synchronized implementation: this is accessed per RPC
   private final ConcurrentLinkedQueue<M> localQueueForNextIteration = new ConcurrentLinkedQueue<M>();
 
+  private BSPPeer<?, ?, ?, ?, M> peer;
+
   @Override
-  public void init(Configuration conf, InetSocketAddress addr) {
+  public void init(BSPPeer<?, ?, ?, ?, M> peer, Configuration conf,
+      InetSocketAddress addr) {
+    this.peer = peer;
     super.initCompression(conf);
     server = new NettyServer(new SpecificResponder(Sender.class, this), addr);
   }
@@ -74,8 +80,13 @@ public class AvroMessageManagerImpl<M ex
   }
 
   public void put(BSPMessageBundle<M> messages) {
-    for (M message : messages.getMessages()) {
-      this.localQueueForNextIteration.add(message);
+    peer.incrementCounter(
+        BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, messages.getMessages()
+            .size());
+    Iterator<M> iterator = messages.getMessages().iterator();
+    while (iterator.hasNext()) {
+      this.localQueueForNextIteration.add(iterator.next());
+      iterator.remove();
     }
   }
 
@@ -140,13 +151,17 @@ public class AvroMessageManagerImpl<M ex
   private final BSPMessageBundle<M> deserializeMessage(ByteBuffer buffer)
       throws IOException {
     BSPMessageBundle<M> msg = new BSPMessageBundle<M>();
+    byte[] byteArray = buffer.array();
     if (compressor == null) {
-      ByteArrayInputStream inArray = new ByteArrayInputStream(buffer.array());
+      peer.incrementCounter(
+          BSPPeerImpl.PeerCounter.MESSAGE_BYTES_RECEIVED, byteArray.length);
+      ByteArrayInputStream inArray = new ByteArrayInputStream(byteArray);
       DataInputStream in = new DataInputStream(inArray);
       msg.readFields(in);
     } else {
-      msg = compressor
-          .decompressBundle(new BSPCompressedBundle(buffer.array()));
+      peer.incrementCounter(
+          BSPPeerImpl.PeerCounter.COMPRESSED_BYTES_RECEIVED, byteArray.length);
+      msg = compressor.decompressBundle(new BSPCompressedBundle(byteArray));
     }
 
     return msg;
@@ -159,10 +174,16 @@ public class AvroMessageManagerImpl<M ex
       DataOutputStream out = new DataOutputStream(outArray);
       msg.write(out);
       out.close();
-      return ByteBuffer.wrap(outArray.toByteArray());
+      byte[] byteArray = outArray.toByteArray();
+      peer.incrementCounter(
+          BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED, byteArray.length);
+      return ByteBuffer.wrap(byteArray);
     } else {
       BSPCompressedBundle compMsgBundle = compressor.compressBundle(msg);
-      return ByteBuffer.wrap(compMsgBundle.getData());
+      byte[] data = compMsgBundle.getData();
+      peer.incrementCounter(
+          BSPPeerImpl.PeerCounter.COMPRESSED_BYTES_SENT, data.length);
+      return ByteBuffer.wrap(data);
     }
   }
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1298541&r1=1298540&r2=1298541&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java Thu Mar  8 19:59:36 2012
@@ -33,6 +33,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
 import org.apache.hama.util.BSPNetUtils;
 import org.apache.hama.util.CompressionUtil;
@@ -57,9 +59,11 @@ public final class HadoopMessageManagerI
   private Deque<M> localQueue = new LinkedList<M>();
   // this must be a synchronized implementation: this is accessed per RPC
   private final ConcurrentLinkedQueue<M> localQueueForNextIteration = new ConcurrentLinkedQueue<M>();
+  private BSPPeer<?, ?, ?, ?, M> peer;
 
   @Override
-  public final void init(Configuration conf, InetSocketAddress peerAddress) {
+  public final void init(BSPPeer<?, ?, ?, ?, M> peer, Configuration conf, InetSocketAddress peerAddress) {
+    this.peer = peer;
     this.conf = conf;
     super.initCompression(conf);
     startRPCServer(conf, peerAddress);
@@ -107,6 +111,7 @@ public final class HadoopMessageManagerI
       queue = new LinkedList<M>();
     }
     queue.add(msg);
+    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
     outgoingQueues.put(targetPeerAddress, queue);
   }
 
@@ -160,6 +165,8 @@ public final class HadoopMessageManagerI
   @Override
   public final void put(M msg) {
     this.localQueueForNextIteration.add(msg);
+    peer.incrementCounter(
+        BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
   }
 
   @Override

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1298541&r1=1298540&r2=1298541&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java Thu Mar  8 19:59:36 2012
@@ -26,6 +26,7 @@ import java.util.Map.Entry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer;
 
 /**
  * This manager takes care of the messaging. It is responsible to launch a
@@ -40,7 +41,8 @@ public interface MessageManager<M extend
    * @param conf
    * @param peerAddress
    */
-  public void init(Configuration conf, InetSocketAddress peerAddress);
+  public void init(BSPPeer<?, ?, ?, ?, M> peer, Configuration conf,
+      InetSocketAddress peerAddress);
 
   /**
    * Close is called after a task ran. Should be used to cleanup things e.G.

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java?rev=1298541&r1=1298540&r2=1298541&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java Thu Mar  8 19:59:36 2012
@@ -23,8 +23,15 @@ import java.util.Random;
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.BSPPeerImpl;
+import org.apache.hama.bsp.Counters;
+import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
+import org.apache.hama.bsp.message.compress.SnappyCompressor;
 import org.apache.hama.bsp.messages.BooleanMessage;
 import org.apache.hama.bsp.messages.DoubleMessage;
 import org.apache.hama.bsp.messages.IntegerMessage;
@@ -44,12 +51,18 @@ public class TestAvroMessageManager exte
     Configuration conf = new Configuration();
     MessageManager<Writable> messageManager = MessageManagerFactory
         .getMessageManager(conf);
+    conf.set(BSPMessageCompressorFactory.COMPRESSION_CODEC_CLASS,
+        SnappyCompressor.class.getName());
 
     assertTrue(messageManager instanceof AvroMessageManagerImpl);
 
     InetSocketAddress peer = new InetSocketAddress(
         BSPNetUtils.getCanonicalHostname(), BSPNetUtils.getFreePort());
-    messageManager.init(conf, peer);
+
+    BSPPeer<?, ?, ?, ?, Writable> dummyPeer = new BSPPeerImpl<NullWritable, NullWritable, NullWritable, NullWritable, Writable>(
+        conf, FileSystem.get(conf), new Counters());
+
+    messageManager.init(dummyPeer, conf, peer);
 
     messageManager.transfer(peer, randomBundle);
 

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java?rev=1298541&r1=1298540&r2=1298541&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java Thu Mar  8 19:59:36 2012
@@ -25,8 +25,13 @@ import java.util.Map.Entry;
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.BSPPeerImpl;
+import org.apache.hama.bsp.Counters;
 import org.apache.hama.util.BSPNetUtils;
 
 public class TestHadoopMessageManager extends TestCase {
@@ -42,7 +47,9 @@ public class TestHadoopMessageManager ex
 
     InetSocketAddress peer = new InetSocketAddress(
         BSPNetUtils.getCanonicalHostname(), BSPNetUtils.getFreePort());
-    messageManager.init(conf, peer);
+    BSPPeer<?, ?, ?, ?, IntWritable> dummyPeer = new BSPPeerImpl<NullWritable, NullWritable, NullWritable, NullWritable, IntWritable>(
+        conf, FileSystem.get(conf), new Counters());
+    messageManager.init(dummyPeer, conf, peer);
     String peerName = peer.getHostName() + ":" + peer.getPort();
 
     messageManager.send(peerName, new IntWritable(1337));