You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/04/28 02:14:43 UTC

svn commit: r1676398 - in /hama/trunk: conf/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/test/java/org/apache/hama/bsp/

Author: edwardyoon
Date: Tue Apr 28 00:14:43 2015
New Revision: 1676398

URL: http://svn.apache.org/r1676398
Log:
HAMA-954: Make message bundle behaviour configurable

Modified:
    hama/trunk/conf/hama-default.xml
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java

Modified: hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1676398&r1=1676397&r2=1676398&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Tue Apr 28 00:14:43 2015
@@ -195,6 +195,11 @@
     <value>5</value>
     <description>The number of method handler threads to run.</description>
   </property>
+  <property>
+    <name>hama.bsp.messenger.bundle</name>
+    <value>true</value>
+    <description>Message bundling is used to reduce RPC overheads.</description>
+  </property>
   
   <property>
     <name>bsp.input.runtime.partitioning</name>

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1676398&r1=1676397&r2=1676398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Tue Apr 28 00:14:43 2015
@@ -18,7 +18,6 @@
 package org.apache.hama.bsp;
 
 import java.io.IOException;
-import java.util.List;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.Constants;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1676398&r1=1676397&r2=1676398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue Apr 28 00:14:43 2015
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
@@ -50,6 +51,7 @@ import org.apache.hama.bsp.sync.SyncServ
 import org.apache.hama.commons.util.KeyValuePair;
 import org.apache.hama.ipc.BSPPeerProtocol;
 import org.apache.hama.pipes.util.DistributedCacheUtil;
+import org.apache.hama.util.BSPNetUtils;
 import org.apache.hama.util.DistCacheUtils;
 
 /**
@@ -358,7 +360,26 @@ public final class BSPPeerImpl<K1, V1, K
 
   @Override
   public final void send(String peerName, M msg) throws IOException {
-    messenger.send(peerName, msg);
+    if (!conf.getBoolean("hama.bsp.messenger.bundle", true)) {
+      sendDirectly(peerName, msg);
+    } else {
+      messenger.send(peerName, msg);
+    }
+  }
+
+  private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
+
+  public final void sendDirectly(String peerName, M msg) throws IOException {
+    InetSocketAddress targetPeerAddress = null;
+    // Get socket for target peer.
+    if (peerSocketCache.containsKey(peerName)) {
+      targetPeerAddress = peerSocketCache.get(peerName);
+    } else {
+      targetPeerAddress = BSPNetUtils.getAddress(peerName);
+      peerSocketCache.put(peerName, targetPeerAddress);
+    }
+
+    messenger.transfer(targetPeerAddress, msg);
   }
 
   /*
@@ -511,7 +532,7 @@ public final class BSPPeerImpl<K1, V1, K
   public final void clear() {
     messenger.clearOutgoingMessages();
   }
-  
+
   /**
    * @return the string as host:port of this Peer
    */

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1676398&r1=1676397&r2=1676398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Tue Apr 28 00:14:43 2015
@@ -361,6 +361,13 @@ public class LocalBSPRunner implements J
     public InetSocketAddress getListenerAddress() {
       return selfAddress;
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transfer(InetSocketAddress addr, M msg) throws IOException {
+      MANAGER_MAP.get(addr).localQueueForNextIteration.add(msg);
+      peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1);
+    }
   }
 
   public static class LocalUmbilical implements BSPPeerProtocol {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1676398&r1=1676397&r2=1676398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Tue Apr 28 00:14:43 2015
@@ -178,7 +178,7 @@ public abstract class AbstractMessageMan
 
     notifySentMessage(peerName, msg);
   }
-
+  
   /*
    * (non-Javadoc)
    * @see org.apache.hama.bsp.message.MessageManager#getMessageIterator()

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java?rev=1676398&r1=1676397&r2=1676398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java Tue Apr 28 00:14:43 2015
@@ -129,12 +129,13 @@ public final class HamaAsyncMessageManag
         bundle.write(bufferDos);
 
         byte[] compressed = compressor.compress(byteBuffer.toByteArray());
-        peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
+        peer.incrementCounter(
+            BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
             compressed.length);
         bspPeerConnection.put(compressed);
       } else {
-        //peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
-        //    bundle.getLength());
+        // peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
+        // bundle.getLength());
         bspPeerConnection.put(bundle);
       }
     }
@@ -197,4 +198,15 @@ public final class HamaAsyncMessageManag
     return null;
   }
 
+  @Override
+  public void transfer(InetSocketAddress addr, M msg) throws IOException {
+    HamaMessageManager<M> bspPeerConnection = this.getBSPPeerConnection(addr);
+    if (bspPeerConnection == null) {
+      throw new IllegalArgumentException("Can not find " + addr.toString()
+          + " to transfer messages to!");
+    } else {
+      bspPeerConnection.put(msg);
+    }
+  }
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java?rev=1676398&r1=1676397&r2=1676398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java Tue Apr 28 00:14:43 2015
@@ -196,4 +196,15 @@ public final class HamaMessageManagerImp
     return null;
   }
 
+  @Override
+  public void transfer(InetSocketAddress addr, M msg) throws IOException {
+    HamaMessageManager<M> bspPeerConnection = this.getBSPPeerConnection(addr);
+    if (bspPeerConnection == null) {
+      throw new IllegalArgumentException("Can not find " + addr.toString()
+          + " to transfer messages to!");
+    } else {
+      bspPeerConnection.put(msg);
+    }
+  }
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1676398&r1=1676397&r2=1676398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java Tue Apr 28 00:14:43 2015
@@ -85,6 +85,8 @@ public interface MessageManager<M extend
   public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
       throws IOException;
 
+  public void transfer(InetSocketAddress addr, M msg) throws IOException;
+  
   /**
    * Clears the outgoing queue. Can be used to switch queues.
    */

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1676398&r1=1676397&r2=1676398&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Tue Apr 28 00:14:43 2015
@@ -151,6 +151,12 @@ public class TestCheckpoint extends Test
       return null;
     }
 
+    @Override
+    public void transfer(InetSocketAddress addr, Text msg) throws IOException {
+      // TODO Auto-generated method stub
+      
+    }
+
   }
 
   public static class TestBSPPeer implements