You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/11/22 12:14:43 UTC

svn commit: r1412508 - in /hama/trunk/core/src/main/java/org/apache/hama/bsp: BSPPeerImpl.java message/HadoopMessageManagerImpl.java

Author: edwardyoon
Date: Thu Nov 22 11:14:42 2012
New Revision: 1412508

URL: http://svn.apache.org/viewvc?rev=1412508&view=rev
Log:
Add compressed messages counter

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1412508&r1=1412507&r2=1412508&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Nov 22 11:14:42 2012
@@ -58,10 +58,7 @@ public final class BSPPeerImpl<K1, V1, K
   private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
 
   public static enum PeerCounter {
-    SUPERSTEP_SUM, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS,
-    IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED,
-    TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT,
-    COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
+    COMPRESSED_MESSAGES, SUPERSTEP_SUM, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
   }
 
   private final Configuration conf;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1412508&r1=1412507&r2=1412508&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java Thu Nov 22 11:14:42 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
 import org.apache.hama.ipc.HamaRPCProtocolVersion;
@@ -100,14 +101,12 @@ public final class HadoopMessageManagerI
       throw new IllegalArgumentException("Can not find " + addr.toString()
           + " to transfer messages to!");
     } else {
-      if (compressor != null) {
-        if (bundle.getApproximateSize() > conf.getLong(
-            "hama.messenger.compression.threshold", 1048576)) {
-          BSPCompressedBundle compMsgBundle = compressor.compressBundle(bundle);
-          bspPeerConnection.put(compMsgBundle);
-        } else {
-          bspPeerConnection.put(bundle);
-        }
+      if (compressor != null
+          && (bundle.getApproximateSize() > conf.getLong(
+              "hama.messenger.compression.threshold", 1048576))) {
+        BSPCompressedBundle compMsgBundle = compressor.compressBundle(bundle);
+        bspPeerConnection.put(compMsgBundle);
+        peer.incrementCounter(BSPPeerImpl.PeerCounter.COMPRESSED_MESSAGES, 1L);
       } else {
         bspPeerConnection.put(bundle);
       }