You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/02/21 09:39:51 UTC
svn commit: r1072881 - in /incubator/hama/trunk/src:
examples/org/apache/hama/examples/RandBench.java
java/org/apache/hama/Constants.java java/org/apache/hama/bsp/BSPPeer.java
Author: edwardyoon
Date: Mon Feb 21 08:39:51 2011
New Revision: 1072881
URL: http://svn.apache.org/viewvc?rev=1072881&view=rev
Log:
For testing on large cluster
Modified:
incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java
incubator/hama/trunk/src/java/org/apache/hama/Constants.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java?rev=1072881&r1=1072880&r2=1072881&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java Mon Feb 21 08:39:51 2011
@@ -20,6 +20,8 @@ package org.apache.hama.examples;
import java.io.IOException;
import java.util.Random;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
@@ -28,6 +30,7 @@ import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.BSPMessage;
import org.apache.hama.bsp.BSPPeerProtocol;
import org.apache.hama.bsp.ClusterStatus;
+import org.apache.hama.examples.PiEstimator.MyEstimator;
import org.apache.hama.util.Bytes;
import org.apache.zookeeper.KeeperException;
@@ -37,6 +40,7 @@ public class RandBench {
private static final String N_SUPERSTEPS = "supersteps.num";
public static class RandBSP extends BSP {
+ public static final Log LOG = LogFactory.getLog(MyEstimator.class);
private Configuration conf;
private Random r = new Random();
private int sizeOfMsg;
@@ -61,8 +65,11 @@ public class RandBench {
}
bspPeer.sync();
- // clear whole queues entries
- bspPeer.clear();
+
+ BSPMessage received;
+ while ((received = bspPeer.getCurrentMessage()) != null) {
+ LOG.info(Bytes.toString(received.getTag()) + " : " + received.getData().length);
+ }
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=1072881&r1=1072880&r2=1072881&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Mon Feb 21 08:39:51 2011
@@ -46,7 +46,7 @@ public interface Constants {
/** Default port region server listens on. */
public static final int DEFAULT_PEER_PORT = 61000;
- public static final long ATLEAST_WAIT_TIME = 1000;
+ public static final long ATLEAST_WAIT_TIME = 10;
public static final String PEER_ID = "bsp.peer.id";
/** Parameter name for what groom server implementation to use. */
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1072881&r1=1072880&r2=1072881&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Mon Feb 21 08:39:51 2011
@@ -154,6 +154,7 @@ public class BSPPeer implements Watcher,
*/
@Override
public void sync() throws IOException, KeeperException, InterruptedException {
+ enterBarrier();
Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>> it = this.outgoingQueues
.entrySet().iterator();
Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry;
@@ -181,14 +182,16 @@ public class BSPPeer implements Watcher,
// Clear outgoing queues.
clearOutgoingQueues();
- enterBarrier();
- Thread.sleep(Constants.ATLEAST_WAIT_TIME); // TODO - This is temporary work
+ // TODO - This is temporary work
// because
// it can be affected by network condition,
// the number of peers, and the load of zookeeper.
// It should fixed to some flawless way.
- leaveBarrier();
+
+ Thread.sleep(Constants.ATLEAST_WAIT_TIME);
+
currentTaskStatus.incrementSuperstepCount();
+ leaveBarrier();
}
protected boolean enterBarrier() throws KeeperException, InterruptedException {