You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by to...@apache.org on 2011/08/01 16:12:56 UTC

svn commit: r1152788 [7/9] - in /incubator/hama/trunk: ./ bin/ conf/ core/ core/bin/ core/conf/ core/src/ core/src/main/ core/src/main/java/ core/src/main/java/org/ core/src/main/java/org/apache/ core/src/main/java/org/apache/hama/ core/src/main/java/o...

Added: incubator/hama/trunk/core/src/main/webapp/bspmaster/index.html
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/webapp/bspmaster/index.html?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/webapp/bspmaster/index.html (added)
+++ incubator/hama/trunk/core/src/main/webapp/bspmaster/index.html Mon Aug  1 14:12:46 2011
@@ -0,0 +1,36 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<meta HTTP-EQUIV="REFRESH" content="0;url=bspmaster.jsp"/>
+<html>
+
+<head>
+<title>Hama Administration</title>
+</head>
+
+<body>
+
+<h1>Hama Administration</h1>
+
+<ul>
+
+<li><a href="bspmaster.jsp">BSPMaster</a></li>
+
+</ul>
+
+</body>
+
+</html>

Added: incubator/hama/trunk/core/src/main/webapp/bspmaster/machines.jsp
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/webapp/bspmaster/machines.jsp?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/webapp/bspmaster/machines.jsp (added)
+++ incubator/hama/trunk/core/src/main/webapp/bspmaster/machines.jsp Mon Aug  1 14:12:46 2011
@@ -0,0 +1,61 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<%@ page contentType="text/html; charset=UTF-8" import="javax.servlet.*"
+	import="javax.servlet.http.*" import="java.io.*" import="java.util.*"
+	import="java.text.DecimalFormat" import="org.apache.hama.bsp.*"
+	import="org.apache.hama.util.*"%>
+<%!private static final long serialVersionUID = 1L;%>
+<%
+	BSPMaster tracker = (BSPMaster) application
+			.getAttribute("bsp.master");
+	ClusterStatus status = tracker.getClusterStatus(true);
+	String trackerName = tracker.getBSPMasterName();
+	String type = request.getParameter("type");
+%>
+<%!public void generateGroomsTable(JspWriter out, String type,
+			ClusterStatus status, BSPMaster master) throws IOException {
+	out.print("<center>\n");
+    out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
+    out.print("<tr><td align=\"center\" colspan=\"6\"><b>Groom Servers</b></td></tr>\n");
+    out.print("<tr><td><b>Name</b></td>" + "<td><b>Host</b></td>"
+        + "<td><b># running tasks</b></td></tr>\n");
+    for (Map.Entry<String, String> entry : status.getActiveGroomNames()
+        .entrySet()) {
+      out.print("<tr><td><a href=\"http://");
+      out.print(entry.getKey() + ":" + master.getHttpPort() + "/\">");
+      out.print(entry.getValue() + "</a></td><td>");
+      out.print(entry.getValue() + "</td>" + "<td>" + 1 + "</td></tr>\n");
+    }
+    out.print("</table>\n");
+    out.print("</center>\n");
+  }%>
+
+<html>
+
+<title><%=trackerName%> Hama Machine List</title>
+
+<body>
+<h1><a href="bspmaster.jsp"><%=trackerName%></a> Hama Machine List</h1>
+
+<h2>Grooms</h2>
+<%
+  generateGroomsTable(out, type, status, tracker);
+%>
+
+<%
+  out.println(BSPServletUtil.htmlFooter());
+%>
\ No newline at end of file

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,47 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Forming up the miniDfs and miniZooKeeper
+ */
+public abstract class HamaCluster extends HamaClusterTestCase {
+  public static final Log LOG = LogFactory.getLog(HamaCluster.class);
+  protected final static HamaConfiguration conf = new HamaConfiguration();
+
+  public HamaCluster(){
+    super();
+  }
+
+  public HamaCluster(boolean startDfs) {
+    super(startDfs);
+  }
+
+  protected void setUp() throws Exception {
+    super.setUp();
+  }
+
+  protected static HamaConfiguration getConf() {
+    return conf;
+  }
+}

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.File;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+public abstract class HamaClusterTestCase extends HamaTestCase {
+  public static final Log LOG = LogFactory.getLog(HamaClusterTestCase.class);
+  protected MiniDFSCluster dfsCluster;
+  protected MiniBSPCluster bspCluster;
+  protected MiniZooKeeperCluster zooKeeperCluster;
+  protected boolean startDfs;
+  protected int numOfGroom = 2;
+
+  /** default constructor */
+  public HamaClusterTestCase() {
+    this(false);
+  }
+
+  public HamaClusterTestCase(boolean startDfs) {
+    super();
+    this.startDfs = startDfs;
+  }
+
+  /**
+   * Actually start the MiniBSP instance.
+   */
+  protected void hamaClusterSetup() throws Exception {
+    File testDir = new File(getUnitTestdir(getName()).toString());
+
+    // Note that this is done before we create the MiniHamaCluster because we
+    // need to edit the config to add the ZooKeeper servers.
+    this.zooKeeperCluster = new MiniZooKeeperCluster();
+    int clientPort = this.zooKeeperCluster.startup(testDir);
+    conf.set("hama.zookeeper.property.clientPort", Integer.toString(clientPort));
+    bspCluster = new MiniBSPCluster(this.conf, numOfGroom); 
+    bspCluster.startBSPCluster();
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    try {
+      if (this.startDfs) {
+        // This spews a bunch of warnings about missing scheme. TODO: fix.
+        this.dfsCluster = new MiniDFSCluster(0, this.conf, 2, true, true, true,
+          null, null, null, null);
+
+        // mangle the conf so that the fs parameter points to the minidfs we
+        // just started up
+        FileSystem filesystem = dfsCluster.getFileSystem();
+        conf.set("fs.defaultFS", filesystem.getUri().toString());
+        Path parentdir = filesystem.getHomeDirectory();
+        
+        filesystem.mkdirs(parentdir);
+      }
+
+      // do the super setup now. if we had done it first, then we would have
+      // gotten our conf all mangled and a local fs started up.
+      super.setUp();
+
+      // start the instance
+      hamaClusterSetup();
+    } catch (Exception e) {
+      if (zooKeeperCluster != null) {
+        zooKeeperCluster.shutdown();
+      }
+      if (dfsCluster != null) {
+        shutdownDfs(dfsCluster);
+      }
+      throw e;
+    }
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+    try {
+      if (startDfs) {
+        shutdownDfs(dfsCluster);
+      }
+      bspCluster.shutdown();
+    } catch (Exception e) {
+      LOG.error(e);
+    }
+  }
+}

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaTestCase.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaTestCase.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaTestCase.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.AssertionFailedError;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hama.util.Bytes;
+
+public abstract class HamaTestCase extends TestCase {
+  private static Log LOG = LogFactory.getLog(HamaTestCase.class);
+  
+  /** configuration parameter name for test directory */
+  public static final String TEST_DIRECTORY_KEY = "test.build.data";
+
+  private boolean localfs = false;
+  protected Path testDir = null;
+  protected FileSystem fs = null;
+  
+  static {
+    initialize();
+  }
+
+  public volatile HamaConfiguration conf;
+
+  /** constructor */
+  public HamaTestCase() {
+    super();
+    init();
+  }
+  
+  /**
+   * @param name
+   */
+  public HamaTestCase(String name) {
+    super(name);
+    init();
+  }
+  
+  private void init() {
+    conf = new HamaConfiguration();
+    conf.setStrings("bsp.local.dir", "/tmp/hama-test");
+    conf.set("bsp.master.address", "localhost");
+    conf.set("bsp.groom.report.address", "127.0.0.1:0");
+  }
+
+  /**
+   * Note that this method must be called after the mini hdfs cluster has
+   * started or we end up with a local file system.
+   */
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    localfs =
+      (conf.get("fs.defaultFS", "file:///").compareTo("file:///") == 0);
+
+    if (fs == null) {
+      this.fs = FileSystem.get(conf);
+    }
+    try {
+      if (localfs) {
+        this.testDir = getUnitTestdir(getName());
+        if (fs.exists(testDir)) {
+          fs.delete(testDir, true);
+        }
+      } else {
+        this.testDir =
+          this.fs.makeQualified(new Path("/tmp/hama-test"));
+      }
+    } catch (Exception e) {
+      LOG.fatal("error during setup", e);
+      throw e;
+    }
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    try {
+      if (localfs) {
+        if (this.fs.exists(testDir)) {
+          this.fs.delete(testDir, true);
+        }
+      }
+    } catch (Exception e) {
+      LOG.fatal("error during tear down", e);
+    }
+    super.tearDown();
+  }
+
+  protected Path getUnitTestdir(String testName) {
+    return new Path(
+        conf.get(TEST_DIRECTORY_KEY, "/tmp/hama-test/build/data"), testName);
+  }
+
+  /**
+   * Initializes parameters used in the test environment:
+   *
+   * Sets the configuration parameter TEST_DIRECTORY_KEY if not already set.
+   * Sets the boolean debugging if "DEBUGGING" is set in the environment.
+   * If debugging is enabled, reconfigures logging so that the root log level is
+   * set to WARN and the logging level for the package is set to DEBUG.
+   */
+  public static void initialize() {
+    if (System.getProperty(TEST_DIRECTORY_KEY) == null) {
+      System.setProperty(TEST_DIRECTORY_KEY, new File(
+          "build/hama/test").getAbsolutePath());
+    }
+  }
+
+  /**
+   * Common method to close down a MiniDFSCluster and the associated file system
+   *
+   * @param cluster
+   */
+  public static void shutdownDfs(MiniDFSCluster cluster) {
+    if (cluster != null) {
+      LOG.info("Shutting down Mini DFS ");
+      try {
+        cluster.shutdown();
+      } catch (Exception e) {
+        /// Can get a java.lang.reflect.UndeclaredThrowableException thrown
+        // here because of an InterruptedException. Don't let exceptions in
+        // here be cause of test failure.
+      }
+      try {
+        FileSystem fs = cluster.getFileSystem();
+        if (fs != null) {
+          LOG.info("Shutting down FileSystem");
+          fs.close();
+        }
+        FileSystem.closeAll();
+      } catch (IOException e) {
+        LOG.error("error closing file system", e);
+      }
+    }
+  }
+
+  public void assertByteEquals(byte[] expected,
+                               byte[] actual) {
+    if (Bytes.compareTo(expected, actual) != 0) {
+      throw new AssertionFailedError("expected:<" +
+      Bytes.toString(expected) + "> but was:<" +
+      Bytes.toString(actual) + ">");
+    }
+  }
+
+  public static void assertEquals(byte[] expected,
+                               byte[] actual) {
+    if (Bytes.compareTo(expected, actual) != 0) {
+      throw new AssertionFailedError("expected:<" +
+      Bytes.toStringBinary(expected) + "> but was:<" +
+      Bytes.toStringBinary(actual) + ">");
+    }
+  }
+
+}

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import static java.util.concurrent.TimeUnit.*;
+
+import static junit.framework.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.bsp.BSPMaster;
+import org.apache.hama.bsp.GroomServer;
+import org.apache.hama.HamaConfiguration;
+
+
+public class MiniBSPCluster {
+
+  public static final Log LOG = LogFactory.getLog(MiniBSPCluster.class);
+
+  private ScheduledExecutorService scheduler;
+
+  private HamaConfiguration configuration;
+  private BSPMasterRunner master;
+  private List<GroomServerRunner> groomServerList = 
+    new CopyOnWriteArrayList<GroomServerRunner>();
+  private int grooms;
+
+  public class BSPMasterRunner implements Runnable{
+    BSPMaster bspm;
+    HamaConfiguration conf;
+
+    public BSPMasterRunner(HamaConfiguration conf){
+      this.conf = conf;
+      if(null == this.conf) 
+        throw new NullPointerException("No Configuration for BSPMaster.");
+    }  
+
+    public void run(){
+      try{
+        LOG.info("Starting BSP Master.");
+        this.bspm = BSPMaster.startMaster(this.conf); 
+        this.bspm.offerService();
+      }catch(IOException ioe){
+        LOG.error("Fail to startup BSP Master.", ioe);
+      }catch(InterruptedException ie){
+        LOG.error("BSP Master fails in offerService().", ie);
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    public void shutdown(){
+      if(null != this.bspm) this.bspm.shutdown();
+    }
+
+    public boolean isRunning(){
+      if(null == this.bspm) return false;
+
+      if(this.bspm.currentState().equals(BSPMaster.State.RUNNING)){
+        return true;
+      } 
+      return false;
+    }
+
+    public BSPMaster getMaster(){
+      return this.bspm;
+    }
+  }
+
+  public class GroomServerRunner implements Runnable{
+    GroomServer gs;
+    HamaConfiguration conf;
+
+    public GroomServerRunner(HamaConfiguration conf){
+      this.conf = conf;
+    }
+ 
+    public void run(){
+      try{
+        this.gs = GroomServer.constructGroomServer(GroomServer.class, conf);
+        GroomServer.startGroomServer(this.gs).join();
+      }catch(InterruptedException ie){
+        LOG.error("Fail to start GroomServer. ", ie);
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    public void shutdown(){
+      try{
+        if(null != this.gs) this.gs.shutdown();
+      }catch(IOException ioe){
+        LOG.info("Fail to shutdown GroomServer.", ioe);
+      }
+    }
+    
+    public boolean isRunning(){
+      if(null == this.gs) return false;
+      return this.gs.isRunning(); 
+    }
+
+    public GroomServer getGroomServer(){
+      return this.gs;
+    }
+  }
+
+  public MiniBSPCluster(HamaConfiguration conf, int groomServers) {
+    this.configuration = conf;
+    this.grooms = groomServers;
+    if(1 > this.grooms) {
+      this.grooms = 2;  
+    }
+    LOG.info("Groom server number "+this.grooms);
+    int threadpool = conf.getInt("bsp.test.threadpool", 10);
+    LOG.info("Thread pool value "+threadpool);
+    scheduler = Executors.newScheduledThreadPool(threadpool);
+  }
+
+  public void startBSPCluster(){
+    startMaster();
+    startGroomServers();
+  }
+
+  public void shutdownBSPCluster(){
+    if(null != this.master && this.master.isRunning())
+      this.master.shutdown();
+    if(0 < groomServerList.size()){
+      for(GroomServerRunner groom: groomServerList){
+        if(groom.isRunning()) groom.shutdown();
+      }
+    }
+  }
+
+
+  public void startMaster(){
+    if(null == this.scheduler) 
+      throw new NullPointerException("No ScheduledExecutorService exists.");
+    this.master = new BSPMasterRunner(this.configuration);
+    scheduler.schedule(this.master, 0, SECONDS);
+  }
+
+  public void startGroomServers(){
+    if(null == this.scheduler) 
+      throw new NullPointerException("No ScheduledExecutorService exists.");
+    if(null == this.master) 
+      throw new NullPointerException("No BSPMaster exists.");
+    int cnt=0;
+    while(!this.master.isRunning()){
+      LOG.info("Waiting BSPMaster up.");
+      try{
+        Thread.sleep(1000);
+        cnt++;
+        if(100 < cnt){
+          fail("Fail to launch BSPMaster.");
+        }
+      }catch(InterruptedException ie){
+        LOG.error("Fail to check BSP Master's state.", ie);
+        Thread.currentThread().interrupt();
+      }
+    }
+    for(int i=0; i < this.grooms; i++){
+      HamaConfiguration c = new HamaConfiguration(this.configuration);
+      randomPort(c);
+      GroomServerRunner gsr = new GroomServerRunner(c);
+      groomServerList.add(gsr);
+      scheduler.schedule(gsr, 0, SECONDS);
+      cnt = 0;
+      while(!gsr.isRunning()){
+        LOG.info("Waitin for GroomServer up.");
+        try{
+          Thread.sleep(1000);
+          cnt++;
+          if(10 < cnt){
+            fail("Fail to launch groom server.");
+          }
+        }catch(InterruptedException ie){
+          LOG.error("Fail to check Groom Server's state.", ie);
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+
+  }
+
+  private void randomPort(HamaConfiguration conf){
+    try{
+      ServerSocket skt = new ServerSocket(0);
+      int p = skt.getLocalPort(); 
+      skt.close();
+      conf.set(Constants.PEER_PORT, new Integer(p).toString());
+      conf.setInt(Constants.GROOM_RPC_PORT, p+100);
+    }catch(IOException ioe){
+      LOG.error("Can not find a free port for BSPPeer.", ioe);
+    }
+  }
+
+  public void shutdown() {
+    shutdownBSPCluster();
+    scheduler.shutdown();
+  }
+
+  public List<Thread> getGroomServerThreads() {
+    List<Thread> list = new ArrayList<Thread>();
+    for(GroomServerRunner gsr: groomServerList){
+      list.add(new Thread(gsr));
+    }
+    return list;
+  }
+
+  public Thread getMaster() {
+    return new Thread(this.master);
+  }
+
+  public List<GroomServer> getGroomServers(){
+    List<GroomServer> list = new ArrayList<GroomServer>();
+    for(GroomServerRunner gsr: groomServerList){
+      list.add(gsr.getGroomServer());
+    }
+    return list;
+  }
+
+  public BSPMaster getBSPMaster(){
+    if(null != this.master)
+      return this.master.getMaster();
+    return null;
+  }
+
+  public ScheduledExecutorService getScheduler(){
+    return this.scheduler;
+  }
+}

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,94 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+
+public class TestBSPMasterGroomServer extends HamaCluster {
+
+  private static Log LOG = LogFactory.getLog(TestBSPMasterGroomServer.class);
+  private static String TMP_OUTPUT = "/tmp/test-example/";
+  private HamaConfiguration configuration;
+  private String TEST_JOB = "src/test/java/testjar/testjob.jar";
+
+  public TestBSPMasterGroomServer() {
+    configuration = new HamaConfiguration();
+    configuration.set("bsp.master.address", "localhost");
+    assertEquals("Make sure master addr is set to localhost:", "localhost",
+        configuration.get("bsp.master.address"));
+    configuration.setStrings("bsp.local.dir", "/tmp/hama-test");
+    configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+    configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+
+  public void testSubmitJob() throws Exception {
+    BSPJob bsp = new BSPJob(configuration);
+    bsp.setJobName("Test Serialize Printing");
+    bsp.setBspClass(testjar.ClassSerializePrinting.HelloBSP.class);
+    bsp.setJar(System.getProperty("user.dir")+"/"+TEST_JOB);
+
+    // Set the task size as a number of GroomServer
+    BSPJobClient jobClient = new BSPJobClient(configuration);
+    configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
+    ClusterStatus cluster = jobClient.getClusterStatus(false);
+    assertEquals(this.numOfGroom, cluster.getMaxTasks());
+    
+    // TODO test the multi-tasks 
+    bsp.setNumBspTask(1);
+    
+    FileSystem fileSys = FileSystem.get(conf);
+
+    if (bsp.waitForCompletion(true)) {
+      checkOutput(fileSys, cluster, conf);
+    }
+    LOG.info("Client finishes execution job.");
+  }
+
+  private static void checkOutput(FileSystem fileSys, ClusterStatus cluster,
+      HamaConfiguration conf) throws Exception {
+    for (int i = 0; i < 1; i++) { // TODO test the multi-tasks
+      SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
+          TMP_OUTPUT + i), conf);
+      LongWritable timestamp = new LongWritable();
+      Text message = new Text();
+      reader.next(timestamp, message);
+      assertTrue("Check if `Hello BSP' gets printed.", message.toString()
+          .indexOf("Hello BSP from") >= 0);
+      reader.close();
+    }
+  }
+
+  public void tearDown() throws Exception {
+    super.tearDown();
+  }
+}

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,81 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import junit.framework.TestCase;
+
+public class TestBSPMessageBundle extends TestCase {
+
+  public void testEmpty() throws IOException {
+    BSPMessageBundle bundle = new BSPMessageBundle();
+    // Serialize it.
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    bundle.write(new DataOutputStream(baos));
+    baos.close();
+    // Deserialize it.
+    BSPMessageBundle readBundle = new BSPMessageBundle();
+    readBundle.readFields(new DataInputStream(new ByteArrayInputStream(baos
+        .toByteArray())));
+    assertEquals(0, readBundle.getMessages().size());
+  }
+
+  public void testSerializationDeserialization() throws IOException {
+    BSPMessageBundle bundle = new BSPMessageBundle();
+    ByteMessage[] testMessages = new ByteMessage[16];
+    for (int i = 0; i < testMessages.length; ++i) {
+      // Create a one byte tag containing the number of the message.
+      byte[] tag = new byte[1];
+      tag[0] = (byte) i;
+      // Create a four bytes data part containing serialized number of the
+      // message.
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      baos.write(i);
+      baos.close();
+      byte[] data = baos.toByteArray();
+      testMessages[i] = new ByteMessage(tag, data);
+      bundle.addMessage(testMessages[i]);
+    }
+    // Serialize it.
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    bundle.write(new DataOutputStream(baos));
+    baos.close();
+    // Deserialize it.
+    BSPMessageBundle readBundle = new BSPMessageBundle();
+    readBundle.readFields(new DataInputStream(new ByteArrayInputStream(baos
+        .toByteArray())));
+    // Check contents.
+    int messageNumber = 0;
+    for (BSPMessage message : readBundle.getMessages()) {
+      ByteMessage byteMessage = (ByteMessage) message;
+      assertTrue(Arrays.equals(testMessages[messageNumber].getTag(),
+          byteMessage.getTag()));
+      assertTrue(Arrays.equals(testMessages[messageNumber].getData(),
+          byteMessage.getData()));
+      ++messageNumber;
+    }
+    assertEquals(testMessages.length, messageNumber);
+  }
+}

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hama.bsp.BSPMaster;
+import org.apache.hama.bsp.ClusterStatus;
+
+public class TestClusterStatus extends TestCase {
+  Random rnd = new Random();
+
+  protected void setUp() throws Exception {
+    super.setUp();
+  }
+
+  public final void testWriteAndReadFields() throws IOException {
+    DataOutputBuffer out = new DataOutputBuffer();
+    DataInputBuffer in = new DataInputBuffer();
+
+    ClusterStatus status1;
+    Map<String, String> grooms = new HashMap<String, String>();
+
+    for (int i = 0; i < 10; i++) {
+      int num = rnd.nextInt();
+      String groomName = "groom_" + num;
+      String peerName = "peerhost:" + num;
+      grooms.put(groomName, peerName);
+    }
+
+    int tasks = rnd.nextInt(100);
+    int maxTasks = rnd.nextInt(100);
+    BSPMaster.State state = BSPMaster.State.RUNNING;
+
+    status1 = new ClusterStatus(grooms, tasks, maxTasks, state);
+    status1.write(out);
+
+    in.reset(out.getData(), out.getLength());
+
+    ClusterStatus status2 = new ClusterStatus();
+    status2.readFields(in);
+
+    Map<String, String> grooms_s = new HashMap<String, String>(status1
+        .getActiveGroomNames());
+    Map<String, String> grooms_o = new HashMap<String, String>(status2
+        .getActiveGroomNames());
+
+    assertEquals(status1.getGroomServers(), status2.getGroomServers());
+
+    assertTrue(grooms_s.entrySet().containsAll(grooms_o.entrySet()));
+    assertTrue(grooms_o.entrySet().containsAll(grooms_s.entrySet()));
+
+    assertEquals(status1.getTasks(), status2.getTasks());
+    assertEquals(status1.getMaxTasks(), status2.getMaxTasks());
+  }
+}

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestMessages.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestMessages.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestMessages.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestMessages.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import junit.framework.TestCase;
+
+import org.apache.hama.util.Bytes;
+
+public class TestMessages extends TestCase {
+
+  public void testByteMessage() {
+    int dataSize = (int) (Runtime.getRuntime().maxMemory() * 0.60);
+    ByteMessage msg = new ByteMessage(Bytes.toBytes("tag"), new byte[dataSize]);
+    assertEquals(msg.getData().length, dataSize);
+    msg = null;
+    
+    byte[] dummyData = new byte[1024];
+    ByteMessage msg2 = new ByteMessage(Bytes.tail(dummyData, 128), dummyData);
+    assertEquals(
+        Bytes.compareTo(msg2.getTag(), 0, 128, msg2.getData(),
+            msg2.getData().length - 128, 128), 0);
+  }
+}

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestIPC.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestIPC.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestIPC.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringUtils;
+
+public class TestIPC extends TestCase {
+  public static final Log LOG = LogFactory.getLog(TestIPC.class);
+
+  final private static Configuration conf = new Configuration();
+  final static private int PING_INTERVAL = 1000;
+
+  static {
+    Client.setPingInterval(conf, PING_INTERVAL);
+  }
+
+  public TestIPC(String name) {
+    super(name);
+  }
+
+  private static final Random RANDOM = new Random();
+
+  private static final String ADDRESS = "0.0.0.0";
+
+  private static class TestServer extends Server {
+    private boolean sleep;
+
+    public TestServer(int handlerCount, boolean sleep) throws IOException {
+      super(ADDRESS, 0, LongWritable.class, handlerCount, conf);
+      this.sleep = sleep;
+    }
+
+    @Override
+    public Writable call(Class<?> protocol, Writable param, long receiveTime)
+        throws IOException {
+      if (sleep) {
+        try {
+          Thread.sleep(RANDOM.nextInt(2 * PING_INTERVAL)); // sleep a bit
+        } catch (InterruptedException e) {
+        }
+      }
+      return param; // echo param as result
+    }
+  }
+
+  private static class SerialCaller extends Thread {
+    private Client client;
+    private InetSocketAddress server;
+    private int count;
+    private boolean failed;
+
+    public SerialCaller(Client client, InetSocketAddress server, int count) {
+      this.client = client;
+      this.server = server;
+      this.count = count;
+    }
+
+    public void run() {
+      for (int i = 0; i < count; i++) {
+        try {
+          LongWritable param = new LongWritable(RANDOM.nextLong());
+          LongWritable value = (LongWritable) client.call(param, server, null,
+              null);
+          if (!param.equals(value)) {
+            LOG.fatal("Call failed!");
+            failed = true;
+            break;
+          }
+        } catch (Exception e) {
+          LOG.fatal("Caught: " + StringUtils.stringifyException(e));
+          failed = true;
+        }
+      }
+    }
+  }
+
+  private static class ParallelCaller extends Thread {
+    private Client client;
+    private int count;
+    private InetSocketAddress[] addresses;
+    private boolean failed;
+
+    public ParallelCaller(Client client, InetSocketAddress[] addresses,
+        int count) {
+      this.client = client;
+      this.addresses = addresses;
+      this.count = count;
+    }
+
+    public void run() {
+      for (int i = 0; i < count; i++) {
+        try {
+          Writable[] params = new Writable[addresses.length];
+          for (int j = 0; j < addresses.length; j++)
+            params[j] = new LongWritable(RANDOM.nextLong());
+          Writable[] values = client.call(params, addresses, null, null);
+          for (int j = 0; j < addresses.length; j++) {
+            if (!params[j].equals(values[j])) {
+              LOG.fatal("Call failed!");
+              failed = true;
+              break;
+            }
+          }
+        } catch (Exception e) {
+          LOG.fatal("Caught: " + StringUtils.stringifyException(e));
+          failed = true;
+        }
+      }
+    }
+  }
+
+  public void testSerial() throws Exception {
+    testSerial(3, false, 2, 5, 100);
+  }
+
+  public void testSerial(int handlerCount, boolean handlerSleep,
+      int clientCount, int callerCount, int callCount) throws Exception {
+    Server server = new TestServer(handlerCount, handlerSleep);
+    InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    server.start();
+
+    Client[] clients = new Client[clientCount];
+    for (int i = 0; i < clientCount; i++) {
+      clients[i] = new Client(LongWritable.class, conf);
+    }
+
+    SerialCaller[] callers = new SerialCaller[callerCount];
+    for (int i = 0; i < callerCount; i++) {
+      callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount);
+      callers[i].start();
+    }
+    for (int i = 0; i < callerCount; i++) {
+      callers[i].join();
+      assertFalse(callers[i].failed);
+    }
+    for (int i = 0; i < clientCount; i++) {
+      clients[i].stop();
+    }
+    server.stop();
+  }
+
+  public void testParallel() throws Exception {
+    testParallel(10, false, 2, 4, 2, 4, 100);
+  }
+
+  public void testParallel(int handlerCount, boolean handlerSleep,
+      int serverCount, int addressCount, int clientCount, int callerCount,
+      int callCount) throws Exception {
+    Server[] servers = new Server[serverCount];
+    for (int i = 0; i < serverCount; i++) {
+      servers[i] = new TestServer(handlerCount, handlerSleep);
+      servers[i].start();
+    }
+
+    InetSocketAddress[] addresses = new InetSocketAddress[addressCount];
+    for (int i = 0; i < addressCount; i++) {
+      addresses[i] = NetUtils.getConnectAddress(servers[i % serverCount]);
+    }
+
+    Client[] clients = new Client[clientCount];
+    for (int i = 0; i < clientCount; i++) {
+      clients[i] = new Client(LongWritable.class, conf);
+    }
+
+    ParallelCaller[] callers = new ParallelCaller[callerCount];
+    for (int i = 0; i < callerCount; i++) {
+      callers[i] = new ParallelCaller(clients[i % clientCount], addresses,
+          callCount);
+      callers[i].start();
+    }
+    for (int i = 0; i < callerCount; i++) {
+      callers[i].join();
+      assertFalse(callers[i].failed);
+    }
+    for (int i = 0; i < clientCount; i++) {
+      clients[i].stop();
+    }
+    for (int i = 0; i < serverCount; i++) {
+      servers[i].stop();
+    }
+  }
+
+  public void testStandAloneClient() throws Exception {
+    testParallel(10, false, 2, 4, 2, 4, 100);
+    Client client = new Client(LongWritable.class, conf);
+    InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
+    try {
+      client.call(new LongWritable(RANDOM.nextLong()), address, null, null);
+      fail("Expected an exception to have been thrown");
+    } catch (IOException e) {
+      String message = e.getMessage();
+      String addressText = address.toString();
+      assertTrue("Did not find " + addressText + " in " + message, message
+          .contains(addressText));
+      Throwable cause = e.getCause();
+      assertNotNull("No nested exception in " + e, cause);
+      String causeText = cause.getMessage();
+      assertTrue("Did not find " + causeText + " in " + message, message
+          .contains(causeText));
+    }
+  }
+
+}

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestRPC.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestRPC.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestRPC.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+import junit.framework.TestCase;
+
+public class TestRPC extends TestCase {
+  private static final int PORT = 1234;
+  private static final String ADDRESS = "0.0.0.0";
+
+  public static final Log LOG = LogFactory
+      .getLog("org.apache.hadoop.ipc.TestRPC");
+
+  private static Configuration conf = new Configuration();
+
+  public TestRPC(String name) {
+    super(name);
+  }
+
+  public interface TestProtocol extends VersionedProtocol {
+    public static final long versionID = 1L;
+
+    void ping() throws IOException;
+
+    String echo(String value) throws IOException;
+
+    String[] echo(String[] value) throws IOException;
+
+    Writable echo(Writable value) throws IOException;
+
+    int add(int v1, int v2) throws IOException;
+
+    int add(int[] values) throws IOException;
+
+    int error() throws IOException;
+
+    void testServerGet() throws IOException;
+  }
+
+  public class TestImpl implements TestProtocol {
+
+    public long getProtocolVersion(String protocol, long clientVersion) {
+      return TestProtocol.versionID;
+    }
+
+    public void ping() {
+    }
+
+    public String echo(String value) throws IOException {
+      return value;
+    }
+
+    public String[] echo(String[] values) throws IOException {
+      return values;
+    }
+
+    public Writable echo(Writable writable) {
+      return writable;
+    }
+
+    public int add(int v1, int v2) {
+      return v1 + v2;
+    }
+
+    public int add(int[] values) {
+      int sum = 0;
+      for (int i = 0; i < values.length; i++) {
+        sum += values[i];
+      }
+      return sum;
+    }
+
+    public int error() throws IOException {
+      throw new IOException("bobo");
+    }
+
+    public void testServerGet() throws IOException {
+      if (!(Server.get() instanceof RPC.Server)) {
+        throw new IOException("Server.get() failed");
+      }
+    }
+
+  }
+
+  public void testCalls() throws Exception {
+    Server server = RPC.getServer(new TestImpl(), ADDRESS, PORT, conf);
+    server.start();
+
+    InetSocketAddress addr = new InetSocketAddress(PORT);
+    TestProtocol proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
+        TestProtocol.versionID, addr, conf);
+
+    proxy.ping();
+
+    String stringResult = proxy.echo("foo");
+    assertEquals(stringResult, "foo");
+
+    stringResult = proxy.echo((String) null);
+    assertEquals(stringResult, null);
+
+    String[] stringResults = proxy.echo(new String[] { "foo", "bar" });
+    assertTrue(Arrays.equals(stringResults, new String[] { "foo", "bar" }));
+
+    stringResults = proxy.echo((String[]) null);
+    assertTrue(Arrays.equals(stringResults, null));
+
+    int intResult = proxy.add(1, 2);
+    assertEquals(intResult, 3);
+
+    intResult = proxy.add(new int[] { 1, 2 });
+    assertEquals(intResult, 3);
+
+    boolean caught = false;
+    try {
+      proxy.error();
+    } catch (IOException e) {
+      LOG.debug("Caught " + e);
+      caught = true;
+    }
+    assertTrue(caught);
+
+    proxy.testServerGet();
+
+    // try some multi-calls
+    Method echo = TestProtocol.class.getMethod("echo",
+        new Class[] { String.class });
+    String[] strings = (String[]) RPC.call(echo, new String[][] { { "a" },
+        { "b" } }, new InetSocketAddress[] { addr, addr }, null, conf);
+    assertTrue(Arrays.equals(strings, new String[] { "a", "b" }));
+
+    Method ping = TestProtocol.class.getMethod("ping", new Class[] {});
+    Object[] voids = (Object[]) RPC.call(ping, new Object[][] { {}, {} },
+        new InetSocketAddress[] { addr, addr }, null, conf);
+    assertEquals(voids, null);
+
+    server.stop();
+  }
+
+  public static void main(String[] args) throws Exception {
+
+    new TestRPC("test").testCalls();
+
+  }
+
+}

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestBytes.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestBytes.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestBytes.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestBytes.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+public class TestBytes extends TestCase {
+  public void testNullHashCode() {
+    byte [] b = null;
+    Exception ee = null;
+    try {
+      Bytes.hashCode(b);
+    } catch (Exception e) {
+      ee = e;
+    }
+    assertNotNull(ee);
+  }
+
+  public void testSplit() throws Exception {
+    byte [] lowest = Bytes.toBytes("AAA");
+    byte [] middle = Bytes.toBytes("CCC");
+    byte [] highest = Bytes.toBytes("EEE");
+    byte [][] parts = Bytes.split(lowest, highest, 1);
+    for (int i = 0; i < parts.length; i++) {
+      System.out.println(Bytes.toString(parts[i]));
+    }
+    assertEquals(3, parts.length);
+    assertTrue(Bytes.equals(parts[1], middle));
+    // Now divide into three parts.  Change highest so split is even.
+    highest = Bytes.toBytes("DDD");
+    parts = Bytes.split(lowest, highest, 2);
+    for (int i = 0; i < parts.length; i++) {
+      System.out.println(Bytes.toString(parts[i]));
+    }
+    assertEquals(4, parts.length);
+    // Assert that 3rd part is 'CCC'.
+    assertTrue(Bytes.equals(parts[2], middle));
+  }
+
+  public void testSplit2() throws Exception {
+    // More split tests.
+    byte [] lowest = Bytes.toBytes("http://A");
+    byte [] highest = Bytes.toBytes("http://z");
+    byte [] middle = Bytes.toBytes("http://]");
+    byte [][] parts = Bytes.split(lowest, highest, 1);
+    for (int i = 0; i < parts.length; i++) {
+      System.out.println(Bytes.toString(parts[i]));
+    }
+    assertEquals(3, parts.length);
+    assertTrue(Bytes.equals(parts[1], middle));
+  }
+
+  public void testToLong() throws Exception {
+    long [] longs = {-1l, 123l, 122232323232l};
+    for (int i = 0; i < longs.length; i++) {
+      byte [] b = Bytes.toBytes(longs[i]);
+      assertEquals(longs[i], Bytes.toLong(b));
+    }
+  }
+
+  public void testToFloat() throws Exception {
+    float [] floats = {-1f, 123.123f, Float.MAX_VALUE};
+    for (int i = 0; i < floats.length; i++) {
+      byte [] b = Bytes.toBytes(floats[i]);
+      assertEquals(floats[i], Bytes.toFloat(b));
+    }
+  }
+
+  public void testToDouble() throws Exception {
+    double [] doubles = {Double.MIN_VALUE, Double.MAX_VALUE};
+    for (int i = 0; i < doubles.length; i++) {
+      byte [] b = Bytes.toBytes(doubles[i]);
+      assertEquals(doubles[i], Bytes.toDouble(b));
+    }
+  }
+
+  public void testBinarySearch() throws Exception {
+    byte [][] arr = {
+        {1},
+        {3},
+        {5},
+        {7},
+        {9},
+        {11},
+        {13},
+        {15},
+    };
+    byte [] key1 = {3,1};
+    byte [] key2 = {4,9};
+    byte [] key2_2 = {4};
+    byte [] key3 = {5,11};
+    
+    assertEquals(1, Bytes.binarySearch(arr, key1, 0, 1,
+      Bytes.BYTES_RAWCOMPARATOR));
+    assertEquals(0, Bytes.binarySearch(arr, key1, 1, 1,
+      Bytes.BYTES_RAWCOMPARATOR));
+    assertEquals(-(2+1), Arrays.binarySearch(arr, key2_2,
+      Bytes.BYTES_COMPARATOR));
+    assertEquals(-(2+1), Bytes.binarySearch(arr, key2, 0, 1,
+      Bytes.BYTES_RAWCOMPARATOR));
+    assertEquals(4, Bytes.binarySearch(arr, key2, 1, 1,
+      Bytes.BYTES_RAWCOMPARATOR));
+    assertEquals(2, Bytes.binarySearch(arr, key3, 0, 1,
+      Bytes.BYTES_RAWCOMPARATOR));
+    assertEquals(5, Bytes.binarySearch(arr, key3, 1, 1,
+      Bytes.BYTES_RAWCOMPARATOR));
+  }
+  
+  public void testIncrementBytes() throws IOException {
+
+    assertTrue(checkTestIncrementBytes(10, 1));
+    assertTrue(checkTestIncrementBytes(12, 123435445));
+    assertTrue(checkTestIncrementBytes(124634654, 1));
+    assertTrue(checkTestIncrementBytes(10005460, 5005645));
+    assertTrue(checkTestIncrementBytes(1, -1));
+    assertTrue(checkTestIncrementBytes(10, -1));
+    assertTrue(checkTestIncrementBytes(10, -5));
+    assertTrue(checkTestIncrementBytes(1005435000, -5));
+    assertTrue(checkTestIncrementBytes(10, -43657655));
+    assertTrue(checkTestIncrementBytes(-1, 1));
+    assertTrue(checkTestIncrementBytes(-26, 5034520));
+    assertTrue(checkTestIncrementBytes(-10657200, 5));
+    assertTrue(checkTestIncrementBytes(-12343250, 45376475));
+    assertTrue(checkTestIncrementBytes(-10, -5));
+    assertTrue(checkTestIncrementBytes(-12343250, -5));
+    assertTrue(checkTestIncrementBytes(-12, -34565445));
+    assertTrue(checkTestIncrementBytes(-1546543452, -34565445));
+  }
+  
+  private static boolean checkTestIncrementBytes(long val, long amount) 
+  throws IOException {
+    byte[] value = Bytes.toBytes(val);
+    byte [] testValue = {-1, -1, -1, -1, -1, -1, -1, -1};
+    if (value[0] > 0) {
+      testValue = new byte[Bytes.SIZEOF_LONG];
+    }
+    System.arraycopy(value, 0, testValue, testValue.length - value.length,
+        value.length);
+
+    long incrementResult = Bytes.toLong(Bytes.incrementBytes(value, amount));
+
+    return (Bytes.toLong(testValue) + amount) == incrementResult;
+  }
+}

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestNumeric.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestNumeric.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestNumeric.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestNumeric.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,34 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import junit.framework.TestCase;
+
+public class TestNumeric extends TestCase {
+  final static int TEST_INT = 3;
+  final static double TEST_DOUBLE = 0.4;
+
+  /**
+   * Double conversion test
+   */
+  public void testDouble() {
+    assertEquals(Bytes.toDouble(Bytes.toBytes(TEST_DOUBLE)), TEST_DOUBLE);
+  }
+}

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestRandomVariable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestRandomVariable.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestRandomVariable.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestRandomVariable.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,74 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import org.apache.log4j.Logger;
+
+import junit.framework.TestCase;
+
+/**
+ * Random variable generation test
+ */
+public class TestRandomVariable extends TestCase {
+  static final Logger LOG = Logger.getLogger(TestRandomVariable.class);
+  final static int COUNT = 50;
+
+  /**
+   * Random object test
+   * 
+   * @throws Exception
+   */
+  public void testRand() throws Exception {
+    for (int i = 0; i < COUNT; i++) {
+      double result = RandomVariable.rand();
+      assertTrue(result >= 0.0d && result <= 1.0);
+    }
+  }
+
+  /**
+   * Random integer test
+   * 
+   * @throws Exception
+   */
+  public void testRandInt() throws Exception {
+    final int min = 122;
+    final int max = 561;
+
+    for (int i = 0; i < COUNT; i++) {
+      int result = RandomVariable.randInt(min, max);
+      assertTrue(result >= min && result <= max);
+    }
+  }
+
+  /**
+   * Uniform test
+   * 
+   * @throws Exception
+   */
+  public void testUniform() throws Exception {
+    final double min = 1.0d;
+    final double max = 3.0d;
+
+    for (int i = 0; i < COUNT; i++) {
+      double result = RandomVariable.uniform(min, max);
+      assertTrue(result >= min && result <= max);
+    }
+  }
+}

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.zookeeper;
+
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+
+import junit.framework.TestCase;
+
+public class TestZKTools extends TestCase {
+
+  public void testZKProps() {
+    HamaConfiguration conf = new HamaConfiguration();
+    conf.set(Constants.ZOOKEEPER_QUORUM, "test.com:123");
+    conf.set(Constants.ZOOKEEPER_CLIENT_PORT, "2222");
+
+    assertEquals("test.com:2222", QuorumPeer.getZKQuorumServersString(conf));
+  }
+}

Added: incubator/hama/trunk/core/src/test/java/testjar/ClassSerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/testjar/ClassSerializePrinting.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/testjar/ClassSerializePrinting.java (added)
+++ incubator/hama/trunk/core/src/test/java/testjar/ClassSerializePrinting.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package testjar;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.zookeeper.KeeperException;
+
+public class ClassSerializePrinting {
+  private static String TMP_OUTPUT = "/tmp/test-example/";
+
+  public static class HelloBSP extends BSP {
+    public static final Log LOG = LogFactory.getLog(HelloBSP.class);
+    private Configuration conf;
+    private final static int PRINT_INTERVAL = 1000;
+    private FileSystem fileSys;
+    private int num;
+
+    public void bsp(BSPPeer bspPeer) throws IOException,
+        KeeperException, InterruptedException {
+
+      int i = 0;
+      for (String otherPeer : bspPeer.getAllPeerNames()) {
+        String peerName = bspPeer.getPeerName();
+        if (peerName.equals(otherPeer)) {
+          writeLogToFile(peerName, i);
+        }
+
+        Thread.sleep(PRINT_INTERVAL);
+        bspPeer.sync();
+        i++;
+      }
+    }
+
+    private void writeLogToFile(String string, int i) throws IOException {
+      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+          new Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
+          CompressionType.NONE);
+      writer.append(new LongWritable(System.currentTimeMillis()), new Text(
+          "Hello BSP from " + (i + 1) + " of " + num + ": " + string));
+      writer.close();
+    }
+
+    public Configuration getConf() {
+      return conf;
+    }
+
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+      num = Integer.parseInt(conf.get("bsp.peers.num"));
+      try {
+        fileSys = FileSystem.get(conf);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+
+  }
+
+}

Added: incubator/hama/trunk/core/src/test/java/testjar/testjob.jar
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/testjar/testjob.jar?rev=1152788&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/hama/trunk/core/src/test/java/testjar/testjob.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Propchange: incubator/hama/trunk/examples/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Mon Aug  1 14:12:46 2011
@@ -0,0 +1,3 @@
+*.iml
+
+target

Added: incubator/hama/trunk/examples/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/pom.xml?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/examples/pom.xml (added)
+++ incubator/hama/trunk/examples/pom.xml Mon Aug  1 14:12:46 2011
@@ -0,0 +1,76 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <parent>
+    <groupId>org.apache.hama</groupId>
+    <artifactId>hama-parent</artifactId>
+    <version>0.4.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.hama</groupId>
+  <artifactId>hama-examples</artifactId>
+  <name>Apache Hama Examples</name>
+  <version>0.4.0-incubating-SNAPSHOT</version>
+  <packaging>jar</packaging>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hama</groupId>
+      <artifactId>hama-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+  <build>
+    <finalName>hama-examples-${project.version}</finalName>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <artifactId>maven-surefire-plugin</artifactId>
+          <configuration>
+            <skipTests>true</skipTests>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>1.4</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <transformers>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                  <mainClass>org.apache.hama.examples.ExampleDriver</mainClass>
+                </transformer>
+              </transformers>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

Added: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java (added)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,42 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import org.apache.hadoop.util.ProgramDriver;
+import org.apache.hama.examples.graph.PageRank;
+import org.apache.hama.examples.graph.ShortestPaths;
+
+public class ExampleDriver {
+
+  public static void main(String[] args) {
+    ProgramDriver pgd = new ProgramDriver();
+    try {
+      pgd.addClass("pi", PiEstimator.class, "Pi Estimator");
+      pgd.addClass("bench", RandBench.class, "Random Communication Benchmark");
+      pgd.addClass("test", SerializePrinting.class, "Serialize Printing Test");
+      pgd.addClass("sssp", ShortestPaths.class, "Single Source Shortest Path");
+      pgd.addClass("pagerank", PageRank.class, "PageRank");
+      
+      pgd.driver(args);
+    } catch (Throwable e) {
+      e.printStackTrace();
+    }
+  }
+}

Added: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java (added)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobClient;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.ClusterStatus;
+import org.apache.hama.bsp.DoubleMessage;
+import org.apache.zookeeper.KeeperException;
+
+public class PiEstimator {
+  private static String MASTER_TASK = "master.task.";
+  private static Path TMP_OUTPUT = new Path("/tmp/pi-example/output");
+
+  public static class MyEstimator extends BSP {
+    public static final Log LOG = LogFactory.getLog(MyEstimator.class);
+    private Configuration conf;
+    private String masterTask;
+    private static final int iterations = 10000;
+
+    public void bsp(BSPPeer bspPeer) throws IOException,
+        KeeperException, InterruptedException {
+      int in = 0, out = 0;
+      for (int i = 0; i < iterations; i++) {
+        double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
+        if ((Math.sqrt(x * x + y * y) < 1.0)) {
+          in++;
+        } else {
+          out++;
+        }
+      }
+
+      double data = 4.0 * (double) in / (double) iterations;
+      DoubleMessage estimate = new DoubleMessage(bspPeer.getPeerName(), data);
+
+      bspPeer.send(masterTask, estimate);
+      bspPeer.sync();
+
+      if (bspPeer.getPeerName().equals(masterTask)) {
+        double pi = 0.0;
+        int numPeers = bspPeer.getNumCurrentMessages();
+        DoubleMessage received;
+        while ((received = (DoubleMessage) bspPeer.getCurrentMessage()) != null) {
+          pi += received.getData();
+        }
+
+        pi = pi / numPeers;
+        writeResult(pi);
+      }
+    }
+
+    private void writeResult(double pi) throws IOException {
+      FileSystem fileSys = FileSystem.get(conf);
+
+      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+          TMP_OUTPUT, DoubleWritable.class, DoubleWritable.class,
+          CompressionType.NONE);
+      writer.append(new DoubleWritable(pi), new DoubleWritable(0));
+      writer.close();
+    }
+
+    public Configuration getConf() {
+      return conf;
+    }
+
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+      this.masterTask = conf.get(MASTER_TASK);
+    }
+
+  }
+
+  private static void initTempDir(FileSystem fileSys) throws IOException {
+    if (fileSys.exists(TMP_OUTPUT)) {
+      fileSys.delete(TMP_OUTPUT, true);
+    }
+  }
+
+  private static void printOutput(FileSystem fileSys, HamaConfiguration conf)
+      throws IOException {
+    SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, TMP_OUTPUT,
+        conf);
+    DoubleWritable output = new DoubleWritable();
+    DoubleWritable zero = new DoubleWritable();
+    reader.next(output, zero);
+    reader.close();
+
+    System.out.println("Estimated value of PI is " + output);
+  }
+
+  public static void main(String[] args) throws InterruptedException,
+      IOException, ClassNotFoundException {
+    // BSP job configuration
+    HamaConfiguration conf = new HamaConfiguration();
+
+    BSPJob bsp = new BSPJob(conf, PiEstimator.class);
+    // Set the job name
+    bsp.setJobName("Pi Estimation Example");
+    bsp.setBspClass(MyEstimator.class);
+
+    BSPJobClient jobClient = new BSPJobClient(conf);
+    ClusterStatus cluster = jobClient.getClusterStatus(true);
+
+    if (args.length > 0) {
+      bsp.setNumBspTask(Integer.parseInt(args[0]));
+    } else {
+      // Set to maximum
+      bsp.setNumBspTask(cluster.getMaxTasks());
+    }
+
+    // Choose one as a master
+    for (String peerName : cluster.getActiveGroomNames().values()) {
+      conf.set(MASTER_TASK, peerName);
+      break;
+    }
+
+    FileSystem fileSys = FileSystem.get(conf);
+    initTempDir(fileSys);
+
+    long startTime = System.currentTimeMillis();
+
+    if (bsp.waitForCompletion(true)) {
+      printOutput(fileSys, conf);
+
+      System.out.println("Job Finished in "
+          + (double) (System.currentTimeMillis() - startTime) / 1000.0
+          + " seconds");
+    }
+  }
+}

Added: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java (added)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobClient;
+import org.apache.hama.bsp.BSPMessage;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.ByteMessage;
+import org.apache.hama.bsp.ClusterStatus;
+import org.apache.hama.util.Bytes;
+import org.apache.zookeeper.KeeperException;
+
+public class RandBench {
+  private static final String SIZEOFMSG = "msg.size";
+  private static final String N_COMMUNICATIONS = "communications.num";
+  private static final String N_SUPERSTEPS = "supersteps.num";
+
+  public static class RandBSP extends BSP {
+    public static final Log LOG = LogFactory.getLog(RandBSP.class);
+    private Configuration conf;
+    private Random r = new Random();
+    private int sizeOfMsg;
+    private int nCommunications;
+    private int nSupersteps;
+
+    @Override
+    public void bsp(BSPPeer bspPeer) throws IOException,
+        KeeperException, InterruptedException {
+      byte[] dummyData = new byte[sizeOfMsg];
+      BSPMessage msg = null;
+      String[] peers = bspPeer.getAllPeerNames();
+      String peerName = bspPeer.getPeerName();
+
+      for (int i = 0; i < nSupersteps; i++) {
+
+        for (int j = 0; j < nCommunications; j++) {
+          String tPeer = peers[r.nextInt(peers.length)];
+          String tag = peerName + " to " + tPeer;
+          msg = new ByteMessage(Bytes.toBytes(tag), dummyData);
+          bspPeer.send(tPeer, msg);
+        }
+
+        bspPeer.sync();
+
+        ByteMessage received;
+        while ((received = (ByteMessage) bspPeer.getCurrentMessage()) != null) {
+          LOG.info(Bytes.toString(received.getTag()) + " : "
+              + received.getData().length);
+        }
+
+      }
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+      this.sizeOfMsg = conf.getInt(SIZEOFMSG, 1);
+      this.nCommunications = conf.getInt(N_COMMUNICATIONS, 1);
+      this.nSupersteps = conf.getInt(N_SUPERSTEPS, 1);
+    }
+
+    @Override
+    public Configuration getConf() {
+      return conf;
+    }
+
+  }
+
+  public static void main(String[] args) throws Exception {
+    if (args.length < 3) {
+      System.out.println("Usage: <sizeOfMsg> <nCommunications> <nSupersteps>");
+      System.exit(-1);
+    }
+
+    // BSP job configuration
+    HamaConfiguration conf = new HamaConfiguration();
+
+    conf.setInt(SIZEOFMSG, Integer.parseInt(args[0]));
+    conf.setInt(N_COMMUNICATIONS, Integer.parseInt(args[1]));
+    conf.setInt(N_SUPERSTEPS, Integer.parseInt(args[2]));
+
+    BSPJob bsp = new BSPJob(conf, RandBench.class);
+    // Set the job name
+    bsp.setJobName("Random Communication Benchmark");
+    bsp.setBspClass(RandBSP.class);
+
+    // Set the task size as a number of GroomServer
+    BSPJobClient jobClient = new BSPJobClient(conf);
+    ClusterStatus cluster = jobClient.getClusterStatus(false);
+    bsp.setNumBspTask(cluster.getMaxTasks());
+
+    long startTime = System.currentTimeMillis();
+    bsp.waitForCompletion(true);
+    System.out.println("Job Finished in "
+        + (double) (System.currentTimeMillis() - startTime) / 1000.0
+        + " seconds");
+  }
+}

Added: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java (added)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobClient;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.ClusterStatus;
+import org.apache.zookeeper.KeeperException;
+
+public class SerializePrinting {
+  private static String TMP_OUTPUT = "/tmp/test-example/";
+
+  public static class HelloBSP extends BSP {
+    public static final Log LOG = LogFactory.getLog(HelloBSP.class);
+    private Configuration conf;
+    private final static int PRINT_INTERVAL = 1000;
+    private FileSystem fileSys;
+    private int num;
+
+    public void bsp(BSPPeer bspPeer) throws IOException,
+        KeeperException, InterruptedException {
+
+      int i = 0;
+      for (String otherPeer : bspPeer.getAllPeerNames()) {
+        String peerName = bspPeer.getPeerName();
+        if (peerName.equals(otherPeer)) {
+          writeLogToFile(peerName, i);
+        }
+
+        Thread.sleep(PRINT_INTERVAL);
+        bspPeer.sync();
+        i++;
+      }
+    }
+
+    private void writeLogToFile(String string, int i) throws IOException {
+      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+          new Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
+          CompressionType.NONE);
+      writer.append(new LongWritable(System.currentTimeMillis()), new Text(
+          "Hello BSP from " + (i + 1) + " of " + num + ": " + string));
+      writer.close();
+    }
+
+    public Configuration getConf() {
+      return conf;
+    }
+
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+      num = Integer.parseInt(conf.get("bsp.peers.num"));
+      try {
+        fileSys = FileSystem.get(conf);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+
+  }
+
+  private static void printOutput(FileSystem fileSys, ClusterStatus cluster,
+      HamaConfiguration conf) throws IOException {
+    System.out.println("Each task printed the \"Hello World\" as below:");
+    for (int i = 0; i < cluster.getGroomServers(); i++) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
+          TMP_OUTPUT + i), conf);
+      LongWritable timestamp = new LongWritable();
+      Text message = new Text();
+      reader.next(timestamp, message);
+      System.out.println(new Date(timestamp.get()) + ": " + message);
+      reader.close();
+    }
+  }
+
+  private static void initTempDir(FileSystem fileSys) throws IOException {
+    if (fileSys.exists(new Path(TMP_OUTPUT))) {
+      fileSys.delete(new Path(TMP_OUTPUT), true);
+    }
+  }
+
+  public static void main(String[] args) throws InterruptedException,
+      IOException, ClassNotFoundException {
+    // BSP job configuration
+    HamaConfiguration conf = new HamaConfiguration();
+
+    BSPJob bsp = new BSPJob(conf, SerializePrinting.class);
+    // Set the job name
+    bsp.setJobName("Serialize Printing");
+    bsp.setBspClass(HelloBSP.class);
+
+    // Set the task size as a number of GroomServer
+    BSPJobClient jobClient = new BSPJobClient(conf);
+    ClusterStatus cluster = jobClient.getClusterStatus(false);
+    bsp.setNumBspTask(cluster.getGroomServers());
+
+    FileSystem fileSys = FileSystem.get(conf);
+    initTempDir(fileSys);
+
+    if (bsp.waitForCompletion(true)) {
+      printOutput(fileSys, cluster, conf);
+    }
+  }
+
+}