You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/02/21 09:39:51 UTC

svn commit: r1072881 - in /incubator/hama/trunk/src: examples/org/apache/hama/examples/RandBench.java java/org/apache/hama/Constants.java java/org/apache/hama/bsp/BSPPeer.java

Author: edwardyoon
Date: Mon Feb 21 08:39:51 2011
New Revision: 1072881

URL: http://svn.apache.org/viewvc?rev=1072881&view=rev
Log:
For testing on large cluster

Modified:
    incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java
    incubator/hama/trunk/src/java/org/apache/hama/Constants.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java?rev=1072881&r1=1072880&r2=1072881&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java Mon Feb 21 08:39:51 2011
@@ -20,6 +20,8 @@ package org.apache.hama.examples;
 import java.io.IOException;
 import java.util.Random;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSP;
@@ -28,6 +30,7 @@ import org.apache.hama.bsp.BSPJobClient;
 import org.apache.hama.bsp.BSPMessage;
 import org.apache.hama.bsp.BSPPeerProtocol;
 import org.apache.hama.bsp.ClusterStatus;
+import org.apache.hama.examples.PiEstimator.MyEstimator;
 import org.apache.hama.util.Bytes;
 import org.apache.zookeeper.KeeperException;
 
@@ -37,6 +40,7 @@ public class RandBench {
   private static final String N_SUPERSTEPS = "supersteps.num";
 
   public static class RandBSP extends BSP {
+    public static final Log LOG = LogFactory.getLog(MyEstimator.class);
     private Configuration conf;
     private Random r = new Random();
     private int sizeOfMsg;
@@ -61,8 +65,11 @@ public class RandBench {
         }
 
         bspPeer.sync();
-        // clear whole queues entries
-        bspPeer.clear();
+
+        BSPMessage received;
+        while ((received = bspPeer.getCurrentMessage()) != null) {
+          LOG.info(Bytes.toString(received.getTag()) + " : " + received.getData().length);
+        }
         
       }
     }

Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=1072881&r1=1072880&r2=1072881&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Mon Feb 21 08:39:51 2011
@@ -46,7 +46,7 @@ public interface Constants {
   /** Default port region server listens on. */
   public static final int DEFAULT_PEER_PORT = 61000;
 
-  public static final long ATLEAST_WAIT_TIME = 1000;
+  public static final long ATLEAST_WAIT_TIME = 10;
   public static final String PEER_ID = "bsp.peer.id";
   
   /** Parameter name for what groom server implementation to use. */

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1072881&r1=1072880&r2=1072881&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Mon Feb 21 08:39:51 2011
@@ -154,6 +154,7 @@ public class BSPPeer implements Watcher,
    */
   @Override
   public void sync() throws IOException, KeeperException, InterruptedException {
+    enterBarrier();
     Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>> it = this.outgoingQueues
         .entrySet().iterator();
     Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry;
@@ -181,14 +182,16 @@ public class BSPPeer implements Watcher,
     // Clear outgoing queues.
     clearOutgoingQueues();
 
-    enterBarrier();
-    Thread.sleep(Constants.ATLEAST_WAIT_TIME); // TODO - This is temporary work
+    // TODO - This is temporary work
     // because
     // it can be affected by network condition,
     // the number of peers, and the load of zookeeper.
     // It should fixed to some flawless way.
-    leaveBarrier();
+    
+    Thread.sleep(Constants.ATLEAST_WAIT_TIME); 
+    
     currentTaskStatus.incrementSuperstepCount();
+    leaveBarrier();
   }
 
   protected boolean enterBarrier() throws KeeperException, InterruptedException {