You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/04/28 02:14:43 UTC
svn commit: r1676398 - in /hama/trunk: conf/
core/src/main/java/org/apache/hama/bsp/
core/src/main/java/org/apache/hama/bsp/message/
core/src/test/java/org/apache/hama/bsp/
Author: edwardyoon
Date: Tue Apr 28 00:14:43 2015
New Revision: 1676398
URL: http://svn.apache.org/r1676398
Log:
HAMA-954: Make message bundle behaviour configurable
Modified:
hama/trunk/conf/hama-default.xml
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
Modified: hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1676398&r1=1676397&r2=1676398&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Tue Apr 28 00:14:43 2015
@@ -195,6 +195,11 @@
<value>5</value>
<description>The number of method handler threads to run.</description>
</property>
+ <property>
+ <name>hama.bsp.messenger.bundle</name>
+ <value>true</value>
+ <description>Message bundling is used to reduce RPC overheads.</description>
+ </property>
<property>
<name>bsp.input.runtime.partitioning</name>
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1676398&r1=1676397&r2=1676398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Tue Apr 28 00:14:43 2015
@@ -18,7 +18,6 @@
package org.apache.hama.bsp;
import java.io.IOException;
-import java.util.List;
import org.apache.hadoop.io.Writable;
import org.apache.hama.Constants;
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1676398&r1=1676397&r2=1676398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue Apr 28 00:14:43 2015
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
@@ -50,6 +51,7 @@ import org.apache.hama.bsp.sync.SyncServ
import org.apache.hama.commons.util.KeyValuePair;
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.pipes.util.DistributedCacheUtil;
+import org.apache.hama.util.BSPNetUtils;
import org.apache.hama.util.DistCacheUtils;
/**
@@ -358,7 +360,26 @@ public final class BSPPeerImpl<K1, V1, K
@Override
public final void send(String peerName, M msg) throws IOException {
- messenger.send(peerName, msg);
+ if (!conf.getBoolean("hama.bsp.messenger.bundle", true)) {
+ sendDirectly(peerName, msg);
+ } else {
+ messenger.send(peerName, msg);
+ }
+ }
+
+ private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
+
+ public final void sendDirectly(String peerName, M msg) throws IOException {
+ InetSocketAddress targetPeerAddress = null;
+ // Get socket for target peer.
+ if (peerSocketCache.containsKey(peerName)) {
+ targetPeerAddress = peerSocketCache.get(peerName);
+ } else {
+ targetPeerAddress = BSPNetUtils.getAddress(peerName);
+ peerSocketCache.put(peerName, targetPeerAddress);
+ }
+
+ messenger.transfer(targetPeerAddress, msg);
}
/*
@@ -511,7 +532,7 @@ public final class BSPPeerImpl<K1, V1, K
public final void clear() {
messenger.clearOutgoingMessages();
}
-
+
/**
* @return the string as host:port of this Peer
*/
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1676398&r1=1676397&r2=1676398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Tue Apr 28 00:14:43 2015
@@ -361,6 +361,13 @@ public class LocalBSPRunner implements J
public InetSocketAddress getListenerAddress() {
return selfAddress;
}
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void transfer(InetSocketAddress addr, M msg) throws IOException {
+ MANAGER_MAP.get(addr).localQueueForNextIteration.add(msg);
+ peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1);
+ }
}
public static class LocalUmbilical implements BSPPeerProtocol {
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1676398&r1=1676397&r2=1676398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Tue Apr 28 00:14:43 2015
@@ -178,7 +178,7 @@ public abstract class AbstractMessageMan
notifySentMessage(peerName, msg);
}
-
+
/*
* (non-Javadoc)
* @see org.apache.hama.bsp.message.MessageManager#getMessageIterator()
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java?rev=1676398&r1=1676397&r2=1676398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java Tue Apr 28 00:14:43 2015
@@ -129,12 +129,13 @@ public final class HamaAsyncMessageManag
bundle.write(bufferDos);
byte[] compressed = compressor.compress(byteBuffer.toByteArray());
- peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
+ peer.incrementCounter(
+ BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
compressed.length);
bspPeerConnection.put(compressed);
} else {
- //peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
- // bundle.getLength());
+ // peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
+ // bundle.getLength());
bspPeerConnection.put(bundle);
}
}
@@ -197,4 +198,15 @@ public final class HamaAsyncMessageManag
return null;
}
+ @Override
+ public void transfer(InetSocketAddress addr, M msg) throws IOException {
+ HamaMessageManager<M> bspPeerConnection = this.getBSPPeerConnection(addr);
+ if (bspPeerConnection == null) {
+ throw new IllegalArgumentException("Can not find " + addr.toString()
+ + " to transfer messages to!");
+ } else {
+ bspPeerConnection.put(msg);
+ }
+ }
+
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java?rev=1676398&r1=1676397&r2=1676398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java Tue Apr 28 00:14:43 2015
@@ -196,4 +196,15 @@ public final class HamaMessageManagerImp
return null;
}
+ @Override
+ public void transfer(InetSocketAddress addr, M msg) throws IOException {
+ HamaMessageManager<M> bspPeerConnection = this.getBSPPeerConnection(addr);
+ if (bspPeerConnection == null) {
+ throw new IllegalArgumentException("Can not find " + addr.toString()
+ + " to transfer messages to!");
+ } else {
+ bspPeerConnection.put(msg);
+ }
+ }
+
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1676398&r1=1676397&r2=1676398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java Tue Apr 28 00:14:43 2015
@@ -85,6 +85,8 @@ public interface MessageManager<M extend
public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
throws IOException;
+ public void transfer(InetSocketAddress addr, M msg) throws IOException;
+
/**
* Clears the outgoing queue. Can be used to switch queues.
*/
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1676398&r1=1676397&r2=1676398&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Tue Apr 28 00:14:43 2015
@@ -151,6 +151,12 @@ public class TestCheckpoint extends Test
return null;
}
+ @Override
+ public void transfer(InetSocketAddress addr, Text msg) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
}
public static class TestBSPPeer implements