You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/11/22 12:14:43 UTC
svn commit: r1412508 - in
/hama/trunk/core/src/main/java/org/apache/hama/bsp: BSPPeerImpl.java
message/HadoopMessageManagerImpl.java
Author: edwardyoon
Date: Thu Nov 22 11:14:42 2012
New Revision: 1412508
URL: http://svn.apache.org/viewvc?rev=1412508&view=rev
Log:
Add compressed messages counter
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1412508&r1=1412507&r2=1412508&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Nov 22 11:14:42 2012
@@ -58,10 +58,7 @@ public final class BSPPeerImpl<K1, V1, K
private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
public static enum PeerCounter {
- SUPERSTEP_SUM, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS,
- IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED,
- TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT,
- COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
+ COMPRESSED_MESSAGES, SUPERSTEP_SUM, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
}
private final Configuration conf;
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1412508&r1=1412507&r2=1412508&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java Thu Nov 22 11:14:42 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
@@ -100,14 +101,12 @@ public final class HadoopMessageManagerI
throw new IllegalArgumentException("Can not find " + addr.toString()
+ " to transfer messages to!");
} else {
- if (compressor != null) {
- if (bundle.getApproximateSize() > conf.getLong(
- "hama.messenger.compression.threshold", 1048576)) {
- BSPCompressedBundle compMsgBundle = compressor.compressBundle(bundle);
- bspPeerConnection.put(compMsgBundle);
- } else {
- bspPeerConnection.put(bundle);
- }
+ if (compressor != null
+ && (bundle.getApproximateSize() > conf.getLong(
+ "hama.messenger.compression.threshold", 1048576))) {
+ BSPCompressedBundle compMsgBundle = compressor.compressBundle(bundle);
+ bspPeerConnection.put(compMsgBundle);
+ peer.incrementCounter(BSPPeerImpl.PeerCounter.COMPRESSED_MESSAGES, 1L);
} else {
bspPeerConnection.put(bundle);
}