You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/02/14 14:13:15 UTC

svn commit: r1070482 - in /incubator/hama/trunk: CHANGES.txt src/examples/org/apache/hama/examples/PiEstimator.java src/java/org/apache/hama/bsp/BSPPeer.java

Author: edwardyoon
Date: Mon Feb 14 13:13:15 2011
New Revision: 1070482

URL: http://svn.apache.org/viewvc?rev=1070482&view=rev
Log:
Can't send one more messages on to same server in bsp() method

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1070482&r1=1070481&r2=1070482&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Feb 14 13:13:15 2011
@@ -197,6 +197,7 @@ Trunk (unreleased changes)
 
   BUG FIXES
   
+    HAMA-352: Can't send one more messages on to same server in bsp() method (edwardyoon)
     HAMA-350: Add task log appender and Fix log4j rootLogger (edwardyoon)
     HAMA-345: Add execution time calculator to Pi job (edwardyoon)
     HAMA-344: Task successfully finished but system re-attempt (edwardyoon)

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1070482&r1=1070481&r2=1070482&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Mon Feb 14 13:13:15 2011
@@ -64,12 +64,14 @@ public class PiEstimator {
       BSPMessage estimate = new BSPMessage(tagName, myData);
 
       bspPeer.send(masterTask, estimate);
+      LOG.info("Send message:" + System.currentTimeMillis());
       bspPeer.sync();
 
       double pi = 0.0;
       BSPMessage received;
       while ((received = bspPeer.getCurrentMessage()) != null) {
-        LOG.info("Receives messages:" + Bytes.toDouble(received.getData()));
+        LOG.info("Receive messages:" + Bytes.toDouble(received.getData())
+            + " from " + Bytes.toString(received.getTag()));
         if (pi == 0.0) {
           pi = Bytes.toDouble(received.getData());
         } else {

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1070482&r1=1070481&r2=1070482&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Mon Feb 14 13:13:15 2011
@@ -140,12 +140,12 @@ public class BSPPeer implements Watcher,
   @Override
   public void send(String peerName, BSPMessage msg) throws IOException {
     LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + peerName);
-    ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues.get(peerName);
+    ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues.get(getAddress(peerName));
     if (queue == null) {
       queue = new ConcurrentLinkedQueue<BSPMessage>();
-      outgoingQueues.put(getAddress(peerName), queue);
     }
     queue.add(msg);
+    outgoingQueues.put(getAddress(peerName), queue);
   }
 
   /*