You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/12/10 07:31:14 UTC
svn commit: r889112 -
/incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java
Author: edwardyoon
Date: Thu Dec 10 06:31:14 2009
New Revision: 889112
URL: http://svn.apache.org/viewvc?rev=889112&view=rev
Log:
Serialize Printing of HelloWorld using BSP
Added:
incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java
Added: incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java?rev=889112&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java Thu Dec 10 06:31:14 2009
@@ -0,0 +1,121 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.HamaCluster;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Serialize Printing of Hello World
+ */
+public class SerializePrinting extends HamaCluster implements Watcher {
+ private Log LOG = LogFactory.getLog(SerializePrinting.class);
+ private int NUM_PEER = 10;
+ List<BSPPeerThread> list = new ArrayList<BSPPeerThread>(NUM_PEER);
+ List<String> echo = new ArrayList<String>();
+ Configuration conf;
+
+ public SerializePrinting() {
+ this.conf = getConf();
+ }
+
+ public void setUp() throws Exception {
+ super.setUp();
+
+ ZooKeeper zk = new ZooKeeper("localhost:21810", 3000, this);
+ Stat s = null;
+ if (zk != null) {
+ try {
+ s = zk.exists(BSPConstants.DEFAULT_ZOOKEEPER_ROOT, false);
+ } catch (Exception e) {
+ LOG.error(s);
+ }
+
+ if (s == null) {
+ try {
+ zk.create(BSPConstants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (KeeperException e) {
+ LOG.error(e);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+ }
+ }
+
+ public void testHelloWorld() throws InterruptedException, IOException {
+
+ BSPPeerThread thread;
+ int[] randomSequence = new int[] { 2, 3, 4, 5, 0, 1, 6, 7, 8, 9 };
+ for (int i = 0; i < NUM_PEER; i++) {
+ conf.set("bsp.peers.num", String.valueOf(NUM_PEER));
+ conf.set(BSPConstants.PEER_HOST, "localhost");
+ conf.set(BSPConstants.PEER_PORT, String
+ .valueOf(30000 + randomSequence[i]));
+ conf.set(BSPConstants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
+ thread = new BSPPeerThread(conf, randomSequence[i]);
+ System.out.println(randomSequence[i] + ", " + thread.getName());
+ list.add(thread);
+ }
+
+ for (int i = 0; i < NUM_PEER; i++) {
+ list.get(i).start();
+ Thread.sleep(1000);
+ }
+
+ for (int i = 0; i < NUM_PEER; i++) {
+ list.get(i).join();
+ }
+ }
+
+ public class BSPPeerThread extends Thread {
+ private BSPPeer peer;
+ private int myId;
+
+ public BSPPeerThread(Configuration conf, int myId) throws IOException {
+ this.peer = new BSPPeer(conf);
+ this.myId = myId;
+ }
+
+ @Override
+ public void run() {
+ for (int i = 0; i < NUM_PEER; i++) {
+ if (myId == i) {
+ echo.add(getName());
+ System.out.println("Hello BSP from " + i + " of " + NUM_PEER + ": "
+ + getName());
+ }
+
+ try {
+ Thread.sleep(2000);
+ peer.sync();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ }
+ }
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ // TODO Auto-generated method stub
+
+ }
+}