You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2011/09/27 11:35:48 UTC

svn commit: r1176297 [11/19] - in /incubator/hama/branches/HamaV2: ./ api/ api/target/ api/target/classes/ api/target/classes/META-INF/ api/target/lib/ api/target/maven-archiver/ api/target/maven-shared-archive-resources/ api/target/maven-shared-archiv...

Added: incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/bspmaster.jsp
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/bspmaster.jsp?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/bspmaster.jsp (added)
+++ incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/bspmaster.jsp Tue Sep 27 09:35:21 2011
@@ -0,0 +1,82 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<%@ page contentType="text/html; charset=UTF-8" import="javax.servlet.*"
+	import="javax.servlet.http.*" import="java.io.*" import="java.util.*"
+	import="java.text.DecimalFormat" import="org.apache.hama.bsp.*"
+	import="org.apache.hama.util.*"%>
+<%!private static final long serialVersionUID = 1L;%>
+<%
+  BSPMaster tracker = (BSPMaster) application
+      .getAttribute("bsp.master");
+  ClusterStatus status = tracker.getClusterStatus(true);
+  String trackerName = tracker.getBSPMasterName();
+  JobStatus[] runningJobs = tracker.jobsToComplete();
+  JobStatus[] allJobs = tracker.getAllJobs();
+%>
+<%!private static DecimalFormat percentFormat = new DecimalFormat("##0.00");
+ 
+  public void generateSummaryTable(JspWriter out, ClusterStatus status,
+      BSPMaster tracker) throws IOException {
+    String tasksPerNode = status.getGroomServers() > 0 ? percentFormat
+        .format(((double) (status.getMaxTasks()) / status
+            .getGroomServers())) : "-";
+    out.print("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n"
+        + "<tr>" + "<th>Groom Servers</th><th>BSP Task Capacity</th>"
+        + "<th>Avg. Tasks/Node</th>"
+        + "<th>Blacklisted Nodes</th></tr>\n");
+    out.print("<tr><td><a href=\"machines.jsp?type=active\">"
+        + status.getActiveGroomNames().size() + "</a></td><td>"
+        + status.getMaxTasks() + "</td><td>" + tasksPerNode
+        + "</td><td><a href=\"machines.jsp?type=blacklisted\">" + 0
+        + "</a>" + "</td></tr></table>\n");
+
+    out.print("<br>");
+  }%>
+
+
+<html>
+<head>
+<title><%=trackerName%> Hama BSP Administration</title>
+<!--  <link rel="stylesheet" type="text/css" href="/static/hadoop.css">-->
+</head>
+<body>
+
+<h1><%=trackerName%> Hama BSP Administration</h1>
+
+<b>State:</b>
+<%=status.getBSPMasterState()%><br>
+<b>Started:</b>
+<%=new Date(tracker.getStartTime())%><br>
+<b>Identifier:</b>
+<%=tracker.getBSPMasterIdentifier()%><br>
+
+<hr>
+<%
+  generateSummaryTable(out, status, tracker);
+%>
+<hr />
+
+<h2 id="running_jobs">Running Jobs</h2>
+<%=BSPServletUtil.generateJobTable("Running", runningJobs,
+          30, 0)%>
+<hr> 
+<h2 id="running_jobs">All Jobs History</h2>
+<%=BSPServletUtil.generateJobTable("All", allJobs,
+          30, 0)%>
+<%
+  out.println(BSPServletUtil.htmlFooter());
+%>
\ No newline at end of file

Propchange: incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/bspmaster.jsp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/index.html
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/index.html?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/index.html (added)
+++ incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/index.html Tue Sep 27 09:35:21 2011
@@ -0,0 +1,36 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<meta HTTP-EQUIV="REFRESH" content="0;url=bspmaster.jsp"/>
+<html>
+
+<head>
+<title>Hama Administration</title>
+</head>
+
+<body>
+
+<h1>Hama Administration</h1>
+
+<ul>
+
+<li><a href="bspmaster.jsp">BSPMaster</a></li>
+
+</ul>
+
+</body>
+
+</html>

Propchange: incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/index.html
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/machines.jsp
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/machines.jsp?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/machines.jsp (added)
+++ incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/machines.jsp Tue Sep 27 09:35:21 2011
@@ -0,0 +1,41 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<%@ page contentType="text/html; charset=UTF-8" import="javax.servlet.*"
+	import="javax.servlet.http.*" import="java.io.*" import="java.util.*"
+	import="java.text.DecimalFormat" import="org.apache.hama.bsp.*"
+	import="org.apache.hama.util.*"%>
+<%!private static final long serialVersionUID = 1L;%>
+<%
+	BSPMaster tracker = (BSPMaster) application
+			.getAttribute("bsp.master");
+	ClusterStatus status = tracker.getClusterStatus(true);
+	String trackerName = tracker.getBSPMasterName();
+	String type = request.getParameter("type");
+%>
+
+<html>
+
+<title><%=trackerName%> Hama Machine List</title>
+
+<body>
+<h1><a href="bspmaster.jsp"><%=trackerName%></a> Hama Machine List</h1>
+
+<h2>Grooms</h2>
+<%
+  out.println(BSPServletUtil.generateGroomsTable(type, status, tracker));
+  out.println(BSPServletUtil.htmlFooter());
+%>
\ No newline at end of file

Propchange: incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/machines.jsp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaCluster.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaCluster.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaCluster.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,47 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Forming up the miniDfs and miniZooKeeper
+ */
+public abstract class HamaCluster extends HamaClusterTestCase {
+  public static final Log LOG = LogFactory.getLog(HamaCluster.class);
+  protected final static HamaConfiguration conf = new HamaConfiguration();
+
+  public HamaCluster(){
+    super();
+  }
+
+  public HamaCluster(boolean startDfs) {
+    super(startDfs);
+  }
+
+  protected void setUp() throws Exception {
+    super.setUp();
+  }
+
+  protected static HamaConfiguration getConf() {
+    return conf;
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaCluster.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaClusterTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaClusterTestCase.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaClusterTestCase.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaClusterTestCase.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.File;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+public abstract class HamaClusterTestCase extends HamaTestCase {
+  public static final Log LOG = LogFactory.getLog(HamaClusterTestCase.class);
+  protected MiniDFSCluster dfsCluster;
+  protected MiniBSPCluster bspCluster;
+  protected MiniZooKeeperCluster zooKeeperCluster;
+  protected boolean startDfs;
+  protected int numOfGroom = 1;
+
+  /** default constructor */
+  public HamaClusterTestCase() {
+    this(false);
+  }
+
+  public HamaClusterTestCase(boolean startDfs) {
+    super();
+    this.startDfs = startDfs;
+  }
+
+  /**
+   * Actually start the MiniBSP instance.
+   */
+  protected void hamaClusterSetup() throws Exception {
+    File testDir = new File(getUnitTestdir(getName()).toString());
+
+    // Note that this is done before we create the MiniHamaCluster because we
+    // need to edit the config to add the ZooKeeper servers.
+    this.zooKeeperCluster = new MiniZooKeeperCluster();
+    int clientPort = this.zooKeeperCluster.startup(testDir);
+    conf.set("hama.zookeeper.property.clientPort", Integer.toString(clientPort));
+    bspCluster = new MiniBSPCluster(this.conf, numOfGroom); 
+    bspCluster.startBSPCluster();
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    try {
+      if (this.startDfs) {
+        // This spews a bunch of warnings about missing scheme. TODO: fix.
+        this.dfsCluster = new MiniDFSCluster(0, this.conf, 2, true, true, true,
+          null, null, null, null);
+
+        // mangle the conf so that the fs parameter points to the minidfs we
+        // just started up
+        FileSystem filesystem = dfsCluster.getFileSystem();
+        conf.set("fs.defaultFS", filesystem.getUri().toString());
+        Path parentdir = filesystem.getHomeDirectory();
+        
+        filesystem.mkdirs(parentdir);
+      }
+
+      // do the super setup now. if we had done it first, then we would have
+      // gotten our conf all mangled and a local fs started up.
+      super.setUp();
+
+      // start the instance
+      hamaClusterSetup();
+    } catch (Exception e) {
+      if (zooKeeperCluster != null) {
+        zooKeeperCluster.shutdown();
+      }
+      if (dfsCluster != null) {
+        shutdownDfs(dfsCluster);
+      }
+      throw e;
+    }
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+    try {
+      if (startDfs) {
+        shutdownDfs(dfsCluster);
+      }
+      bspCluster.shutdown();
+    } catch (Exception e) {
+      LOG.error(e);
+    }
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaClusterTestCase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaTestCase.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaTestCase.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaTestCase.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.AssertionFailedError;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hama.util.Bytes;
+
+public abstract class HamaTestCase extends TestCase {
+  private static Log LOG = LogFactory.getLog(HamaTestCase.class);
+  
+  /** configuration parameter name for test directory */
+  public static final String TEST_DIRECTORY_KEY = "test.build.data";
+
+  private boolean localfs = false;
+  protected Path testDir = null;
+  protected FileSystem fs = null;
+  
+  static {
+    initialize();
+  }
+
+  public volatile HamaConfiguration conf;
+
+  /** constructor */
+  public HamaTestCase() {
+    super();
+    init();
+  }
+  
+  /**
+   * @param name
+   */
+  public HamaTestCase(String name) {
+    super(name);
+    init();
+  }
+  
+  private void init() {
+    conf = new HamaConfiguration();
+    conf.setStrings("bsp.local.dir", "/tmp/hama-test");
+    conf.set("bsp.master.address", "localhost");
+    conf.set("bsp.groom.report.address", "127.0.0.1:0");
+  }
+
+  /**
+   * Note that this method must be called after the mini hdfs cluster has
+   * started or we end up with a local file system.
+   */
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    localfs =
+      (conf.get("fs.defaultFS", "file:///").compareTo("file:///") == 0);
+
+    if (fs == null) {
+      this.fs = FileSystem.get(conf);
+    }
+    try {
+      if (localfs) {
+        this.testDir = getUnitTestdir(getName());
+        if (fs.exists(testDir)) {
+          fs.delete(testDir, true);
+        }
+      } else {
+        this.testDir =
+          this.fs.makeQualified(new Path("/tmp/hama-test"));
+      }
+    } catch (Exception e) {
+      LOG.fatal("error during setup", e);
+      throw e;
+    }
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    try {
+      if (localfs) {
+        if (this.fs.exists(testDir)) {
+          this.fs.delete(testDir, true);
+        }
+      }
+    } catch (Exception e) {
+      LOG.fatal("error during tear down", e);
+    }
+    super.tearDown();
+  }
+
+  protected Path getUnitTestdir(String testName) {
+    return new Path(
+        conf.get(TEST_DIRECTORY_KEY, "/tmp/hama-test/build/data"), testName);
+  }
+
+  /**
+   * Initializes parameters used in the test environment:
+   *
+   * Sets the configuration parameter TEST_DIRECTORY_KEY if not already set.
+   * Sets the boolean debugging if "DEBUGGING" is set in the environment.
+   * If debugging is enabled, reconfigures logging so that the root log level is
+   * set to WARN and the logging level for the package is set to DEBUG.
+   */
+  public static void initialize() {
+    if (System.getProperty(TEST_DIRECTORY_KEY) == null) {
+      System.setProperty(TEST_DIRECTORY_KEY, new File(
+          "build/hama/test").getAbsolutePath());
+    }
+  }
+
+  /**
+   * Common method to close down a MiniDFSCluster and the associated file system
+   *
+   * @param cluster
+   */
+  public static void shutdownDfs(MiniDFSCluster cluster) {
+    if (cluster != null) {
+      LOG.info("Shutting down Mini DFS ");
+      try {
+        cluster.shutdown();
+      } catch (Exception e) {
+        /// Can get a java.lang.reflect.UndeclaredThrowableException thrown
+        // here because of an InterruptedException. Don't let exceptions in
+        // here be cause of test failure.
+      }
+      try {
+        FileSystem fs = cluster.getFileSystem();
+        if (fs != null) {
+          LOG.info("Shutting down FileSystem");
+          fs.close();
+        }
+        FileSystem.closeAll();
+      } catch (IOException e) {
+        LOG.error("error closing file system", e);
+      }
+    }
+  }
+
+  public void assertByteEquals(byte[] expected,
+                               byte[] actual) {
+    if (Bytes.compareTo(expected, actual) != 0) {
+      throw new AssertionFailedError("expected:<" +
+      Bytes.toString(expected) + "> but was:<" +
+      Bytes.toString(actual) + ">");
+    }
+  }
+
+  public static void assertEquals(byte[] expected,
+                               byte[] actual) {
+    if (Bytes.compareTo(expected, actual) != 0) {
+      throw new AssertionFailedError("expected:<" +
+      Bytes.toStringBinary(expected) + "> but was:<" +
+      Bytes.toStringBinary(actual) + ">");
+    }
+  }
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaTestCase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/MiniBSPCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/MiniBSPCluster.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/MiniBSPCluster.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/MiniBSPCluster.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import static java.util.concurrent.TimeUnit.*;
+
+import static junit.framework.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.bsp.BSPMaster;
+import org.apache.hama.bsp.GroomServer;
+import org.apache.hama.HamaConfiguration;
+
+
+public class MiniBSPCluster {
+
+  public static final Log LOG = LogFactory.getLog(MiniBSPCluster.class);
+
+  private ScheduledExecutorService scheduler;
+
+  private HamaConfiguration configuration;
+  private BSPMasterRunner master;
+  private List<GroomServerRunner> groomServerList = 
+    new CopyOnWriteArrayList<GroomServerRunner>();
+  private int grooms;
+
+  public class BSPMasterRunner implements Runnable{
+    BSPMaster bspm;
+    HamaConfiguration conf;
+
+    public BSPMasterRunner(HamaConfiguration conf){
+      this.conf = conf;
+      if(null == this.conf) 
+        throw new NullPointerException("No Configuration for BSPMaster.");
+    }  
+
+    public void run(){
+      try{
+        LOG.info("Starting BSP Master.");
+        this.bspm = BSPMaster.startMaster(this.conf); 
+        this.bspm.offerService();
+      }catch(IOException ioe){
+        LOG.error("Fail to startup BSP Master.", ioe);
+      }catch(InterruptedException ie){
+        LOG.error("BSP Master fails in offerService().", ie);
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    public void shutdown(){
+      if(null != this.bspm) this.bspm.shutdown();
+    }
+
+    public boolean isRunning(){
+      if(null == this.bspm) return false;
+
+      if(this.bspm.currentState().equals(BSPMaster.State.RUNNING)){
+        return true;
+      } 
+      return false;
+    }
+
+    public BSPMaster getMaster(){
+      return this.bspm;
+    }
+  }
+
+  public class GroomServerRunner implements Runnable{
+    GroomServer gs;
+    HamaConfiguration conf;
+
+    public GroomServerRunner(HamaConfiguration conf){
+      this.conf = conf;
+    }
+ 
+    public void run(){
+      try{
+        this.gs = GroomServer.constructGroomServer(GroomServer.class, conf);
+        GroomServer.startGroomServer(this.gs).join();
+      }catch(InterruptedException ie){
+        LOG.error("Fail to start GroomServer. ", ie);
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    public void shutdown(){
+      try{
+        if(null != this.gs) this.gs.shutdown();
+      }catch(IOException ioe){
+        LOG.info("Fail to shutdown GroomServer.", ioe);
+      }
+    }
+    
+    public boolean isRunning(){
+      if(null == this.gs) return false;
+      return this.gs.isRunning(); 
+    }
+
+    public GroomServer getGroomServer(){
+      return this.gs;
+    }
+  }
+
+  public MiniBSPCluster(HamaConfiguration conf, int groomServers) {
+    this.configuration = conf;
+    this.grooms = groomServers;
+    if(1 > this.grooms) {
+      this.grooms = 2;  
+    }
+    LOG.info("Groom server number "+this.grooms);
+    int threadpool = conf.getInt("bsp.test.threadpool", 10);
+    LOG.info("Thread pool value "+threadpool);
+    scheduler = Executors.newScheduledThreadPool(threadpool);
+  }
+
+  public void startBSPCluster(){
+    startMaster();
+    startGroomServers();
+  }
+
+  public void shutdownBSPCluster(){
+    if(null != this.master && this.master.isRunning())
+      this.master.shutdown();
+    if(0 < groomServerList.size()){
+      for(GroomServerRunner groom: groomServerList){
+        if(groom.isRunning()) groom.shutdown();
+      }
+    }
+  }
+
+
+  public void startMaster(){
+    if(null == this.scheduler) 
+      throw new NullPointerException("No ScheduledExecutorService exists.");
+    this.master = new BSPMasterRunner(this.configuration);
+    scheduler.schedule(this.master, 0, SECONDS);
+  }
+
+  public void startGroomServers(){
+    if(null == this.scheduler) 
+      throw new NullPointerException("No ScheduledExecutorService exists.");
+    if(null == this.master) 
+      throw new NullPointerException("No BSPMaster exists.");
+    int cnt=0;
+    while(!this.master.isRunning()){
+      LOG.info("Waiting BSPMaster up.");
+      try{
+        Thread.sleep(1000);
+        cnt++;
+        if(100 < cnt){
+          fail("Fail to launch BSPMaster.");
+        }
+      }catch(InterruptedException ie){
+        LOG.error("Fail to check BSP Master's state.", ie);
+        Thread.currentThread().interrupt();
+      }
+    }
+    for(int i=0; i < this.grooms; i++){
+      HamaConfiguration c = new HamaConfiguration(this.configuration);
+      randomPort(c);
+      GroomServerRunner gsr = new GroomServerRunner(c);
+      groomServerList.add(gsr);
+      scheduler.schedule(gsr, 0, SECONDS);
+      cnt = 0;
+      while(!gsr.isRunning()){
+        LOG.info("Waitin for GroomServer up.");
+        try{
+          Thread.sleep(1000);
+          cnt++;
+          if(10 < cnt){
+            fail("Fail to launch groom server.");
+          }
+        }catch(InterruptedException ie){
+          LOG.error("Fail to check Groom Server's state.", ie);
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+
+  }
+
+  private void randomPort(HamaConfiguration conf){
+    try{
+      ServerSocket skt = new ServerSocket(0);
+      int p = skt.getLocalPort(); 
+      skt.close();
+      conf.set(Constants.PEER_PORT, new Integer(p).toString());
+      conf.setInt(Constants.GROOM_RPC_PORT, p+100);
+    }catch(IOException ioe){
+      LOG.error("Can not find a free port for BSPPeer.", ioe);
+    }
+  }
+
+  public void shutdown() {
+    shutdownBSPCluster();
+    scheduler.shutdown();
+  }
+
+  public List<Thread> getGroomServerThreads() {
+    List<Thread> list = new ArrayList<Thread>();
+    for(GroomServerRunner gsr: groomServerList){
+      list.add(new Thread(gsr));
+    }
+    return list;
+  }
+
+  public Thread getMaster() {
+    return new Thread(this.master);
+  }
+
+  public List<GroomServer> getGroomServers(){
+    List<GroomServer> list = new ArrayList<GroomServer>();
+    for(GroomServerRunner gsr: groomServerList){
+      list.add(gsr.getGroomServer());
+    }
+    return list;
+  }
+
+  public BSPMaster getBSPMaster(){
+    if(null != this.master)
+      return this.master.getMaster();
+    return null;
+  }
+
+  public ScheduledExecutorService getScheduler(){
+    return this.scheduler;
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/MiniBSPCluster.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPPeer.BSPSerializableMessage;
+
+public final class BSPSerializerWrapper {
+
+  private final BSPPeer.BSPMessageSerializer serializer;
+
+  public BSPSerializerWrapper(Configuration conf, int port) throws IOException {
+    this.serializer = new BSPPeer(conf, null, null).new BSPMessageSerializer(
+      conf.getInt("bsp.checkpoint.port", port)); 
+  }  
+
+  public final void serialize(final BSPSerializableMessage tmp) 
+      throws IOException {
+    this.serializer.serialize(tmp);
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,95 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+
+public class TestBSPMasterGroomServer extends HamaCluster {
+
+  private static Log LOG = LogFactory.getLog(TestBSPMasterGroomServer.class);
+  private static String TMP_OUTPUT = "/tmp/test-example/";
+  private HamaConfiguration configuration;
+  private String TEST_JOB = "src/test/java/testjar/testjob.jar";
+
+  public TestBSPMasterGroomServer() {
+    configuration = new HamaConfiguration();
+    configuration.set("bsp.master.address", "localhost");
+    assertEquals("Make sure master addr is set to localhost:", "localhost",
+        configuration.get("bsp.master.address"));
+    configuration.setStrings("bsp.local.dir", "/tmp/hama-test");
+    System.setProperty("hama.log.dir", "/tmp/hama-test/logs");
+    configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+    configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+
+  public void testSubmitJob() throws Exception {
+    BSPJob bsp = new BSPJob(configuration);
+    bsp.setJobName("Test Serialize Printing");
+    bsp.setBspClass(testjar.ClassSerializePrinting.HelloBSP.class);
+    bsp.setJar(System.getProperty("user.dir") + "/" + TEST_JOB);
+
+    // Set the task size as a number of GroomServer
+    BSPJobClient jobClient = new BSPJobClient(configuration);
+    configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
+    ClusterStatus cluster = jobClient.getClusterStatus(false);
+    assertEquals(this.numOfGroom, cluster.getGroomServers());
+    bsp.setNumBspTask(2);
+
+    FileSystem fileSys = FileSystem.get(conf);
+
+    if (bsp.waitForCompletion(true)) {
+      checkOutput(fileSys, cluster, conf);
+    }
+    LOG.info("Client finishes execution job.");
+  }
+
+  private static void checkOutput(FileSystem fileSys, ClusterStatus cluster,
+      HamaConfiguration conf) throws Exception {
+    for (int i = 0; i < 2; i++) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
+          TMP_OUTPUT + i), conf);
+      LongWritable timestamp = new LongWritable();
+      Text message = new Text();
+      reader.next(timestamp, message);
+
+      LOG.info("output: " + message);
+      assertTrue("Check if `Hello BSP' gets printed.", message.toString()
+          .indexOf("Hello BSP from") >= 0);
+      reader.close();
+    }
+  }
+
+  public void tearDown() throws Exception {
+    super.tearDown();
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,81 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import junit.framework.TestCase;
+
+public class TestBSPMessageBundle extends TestCase {
+
+  public void testEmpty() throws IOException {
+    BSPMessageBundle bundle = new BSPMessageBundle();
+    // Serialize it.
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    bundle.write(new DataOutputStream(baos));
+    baos.close();
+    // Deserialize it.
+    BSPMessageBundle readBundle = new BSPMessageBundle();
+    readBundle.readFields(new DataInputStream(new ByteArrayInputStream(baos
+        .toByteArray())));
+    assertEquals(0, readBundle.getMessages().size());
+  }
+
+  public void testSerializationDeserialization() throws IOException {
+    BSPMessageBundle bundle = new BSPMessageBundle();
+    ByteMessage[] testMessages = new ByteMessage[16];
+    for (int i = 0; i < testMessages.length; ++i) {
+      // Create a one byte tag containing the number of the message.
+      byte[] tag = new byte[1];
+      tag[0] = (byte) i;
+      // Create a four bytes data part containing serialized number of the
+      // message.
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      baos.write(i);
+      baos.close();
+      byte[] data = baos.toByteArray();
+      testMessages[i] = new ByteMessage(tag, data);
+      bundle.addMessage(testMessages[i]);
+    }
+    // Serialize it.
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    bundle.write(new DataOutputStream(baos));
+    baos.close();
+    // Deserialize it.
+    BSPMessageBundle readBundle = new BSPMessageBundle();
+    readBundle.readFields(new DataInputStream(new ByteArrayInputStream(baos
+        .toByteArray())));
+    // Check contents.
+    int messageNumber = 0;
+    for (BSPMessage message : readBundle.getMessages()) {
+      ByteMessage byteMessage = (ByteMessage) message;
+      assertTrue(Arrays.equals(testMessages[messageNumber].getTag(),
+          byteMessage.getTag()));
+      assertTrue(Arrays.equals(testMessages[messageNumber].getData(),
+          byteMessage.getData()));
+      ++messageNumber;
+    }
+    assertEquals(testMessages.length, messageNumber);
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+
+public class TestClusterStatus extends TestCase {
+  Random rnd = new Random();
+
+  protected void setUp() throws Exception {
+    super.setUp();
+  }
+
+  public final void testWriteAndReadFields() throws IOException {
+    DataOutputBuffer out = new DataOutputBuffer();
+    DataInputBuffer in = new DataInputBuffer();
+
+    ClusterStatus status1;
+    Map<String, GroomServerStatus> grooms = new HashMap<String, GroomServerStatus>();
+
+    for (int i = 0; i < 10; i++) {
+      int num = rnd.nextInt();
+      String groomName = "groom_" + num;
+      String peerName = "peerhost:" + num;
+      grooms.put(groomName, new GroomServerStatus(peerName, new ArrayList<TaskStatus>(0), 25, 2));
+    }
+
+    int tasks = rnd.nextInt(100);
+    int maxTasks = rnd.nextInt(100);
+    BSPMaster.State state = BSPMaster.State.RUNNING;
+
+    status1 = new ClusterStatus(grooms, tasks, maxTasks, state);
+    status1.write(out);
+
+    in.reset(out.getData(), out.getLength());
+
+    ClusterStatus status2 = new ClusterStatus();
+    status2.readFields(in);
+    
+    for(Entry<String, GroomServerStatus> entry : status2.getActiveGroomServerStatus().entrySet()){
+      assertEquals(entry.getValue().getMaxTasks(),2);
+      assertEquals(entry.getValue().getFailures(),25);
+    }
+
+    Map<String, String> grooms_s = new HashMap<String, String>(
+        status1.getActiveGroomNames());
+    Map<String, String> grooms_o = new HashMap<String, String>(
+        status2.getActiveGroomNames());
+
+    assertEquals(status1.getGroomServers(), status2.getGroomServers());
+
+    assertTrue(grooms_s.entrySet().containsAll(grooms_o.entrySet()));
+    assertTrue(grooms_o.entrySet().containsAll(grooms_s.entrySet()));
+
+    assertEquals(status1.getTasks(), status2.getTasks());
+    assertEquals(status1.getMaxTasks(), status2.getMaxTasks());
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestMessages.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestMessages.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestMessages.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestMessages.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import junit.framework.TestCase;
+
+import org.apache.hama.util.Bytes;
+
+public class TestMessages extends TestCase {
+
+  public void testByteMessage() {
+    int dataSize = (int) (Runtime.getRuntime().maxMemory() * 0.60);
+    ByteMessage msg = new ByteMessage(Bytes.toBytes("tag"), new byte[dataSize]);
+    assertEquals(msg.getData().length, dataSize);
+    msg = null;
+    
+    byte[] dummyData = new byte[1024];
+    ByteMessage msg2 = new ByteMessage(Bytes.tail(dummyData, 128), dummyData);
+    assertEquals(
+        Bytes.compareTo(msg2.getTag(), 0, 128, msg2.getData(),
+            msg2.getData().length - 128, 128), 0);
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestMessages.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.checkpoint;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPMessage;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPSerializerWrapper;
+import org.apache.hama.bsp.DoubleMessage;
+import org.apache.hama.bsp.BSPPeer.BSPSerializableMessage;
+
+public class TestCheckpoint extends TestCase {
+
+  public static final Log LOG = LogFactory.getLog(TestCheckpoint.class);
+
+  private CheckpointRunner runner;
+  private BSPSerializerWrapper serializer;
+  static final String TEST_STRING = "Test String";
+  private FileSystem hdfs;
+  static final DoubleMessage estimate = 
+    new DoubleMessage("192.168.1.123:61000", 3.1415926d);
+
+  public void setUp() throws Exception {
+    Configuration conf = new HamaConfiguration();
+    this.hdfs = FileSystem.get(conf);
+    assertNotNull("File system object should exist.", this.hdfs);
+    this.runner =  
+      new CheckpointRunner(CheckpointRunner.buildCommands(conf));
+    assertNotNull("Checkpoint instance should exist.", this.runner);
+    this.runner.start();
+    Thread.sleep(1000*1);
+    Process process = this.runner.getProcess();
+    assertNotNull("Checkpoint process should be created.", process);
+    this.serializer = new BSPSerializerWrapper(conf, 
+      Integer.parseInt(CheckpointRunner.DEFAULT_PORT));
+  }
+
+  private BSPMessageBundle createMessageBundle() {
+    BSPMessageBundle bundle = new BSPMessageBundle();
+    bundle.addMessage(estimate);
+    return bundle;
+  }
+
+  private String checkpointedPath() {
+      return "/tmp/" + "job_201108221205_000" + "/" + "0" +
+      "/" + "attempt_201108221205_0001_000000_0";
+  }
+
+  public void testCheckpoint() throws Exception {
+    this.serializer.serialize(new BSPSerializableMessage(
+    checkpointedPath(), createMessageBundle()));
+    Thread.sleep(1000); 
+    Path path = new Path(checkpointedPath());
+    boolean exists = this.hdfs.exists(path);
+    assertTrue("Check if file is actually written to hdfs.", exists); 
+    BSPMessageBundle bundle = new BSPMessageBundle(); 
+    DataInput in = new DataInputStream(this.hdfs.open(path));
+    bundle.readFields(in);
+    List<BSPMessage> messages = bundle.getMessages();
+    assertEquals("Only one message exists.", 1,  messages.size());
+    for(BSPMessage message: messages) {
+      String peer = (String)message.getTag();
+      assertEquals("BSPPeer value in form of <ip>:<port>.", peer, estimate.getTag());
+      Double pi = (Double)message.getData();
+      assertEquals("Message content.", pi, estimate.getData());
+    }
+  }
+
+  public void tearDown() throws Exception {
+    this.runner.stop();
+  }
+  
+}

Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/ipc/TestIPC.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/ipc/TestIPC.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/ipc/TestIPC.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringUtils;
+
+public class TestIPC extends TestCase {
+  public static final Log LOG = LogFactory.getLog(TestIPC.class);
+
+  final private static Configuration conf = new Configuration();
+  final static private int PING_INTERVAL = 1000;
+
+  static {
+    Client.setPingInterval(conf, PING_INTERVAL);
+  }
+
+  public TestIPC(String name) {
+    super(name);
+  }
+
+  private static final Random RANDOM = new Random();
+
+  private static final String ADDRESS = "0.0.0.0";
+
+  private static class TestServer extends Server {
+    private boolean sleep;
+
+    public TestServer(int handlerCount, boolean sleep) throws IOException {
+      super(ADDRESS, 0, LongWritable.class, handlerCount, conf);
+      this.sleep = sleep;
+    }
+
+    @Override
+    public Writable call(Class<?> protocol, Writable param, long receiveTime)
+        throws IOException {
+      if (sleep) {
+        try {
+          Thread.sleep(RANDOM.nextInt(2 * PING_INTERVAL)); // sleep a bit
+        } catch (InterruptedException e) {
+        }
+      }
+      return param; // echo param as result
+    }
+  }
+
+  private static class SerialCaller extends Thread {
+    private Client client;
+    private InetSocketAddress server;
+    private int count;
+    private boolean failed;
+
+    public SerialCaller(Client client, InetSocketAddress server, int count) {
+      this.client = client;
+      this.server = server;
+      this.count = count;
+    }
+
+    public void run() {
+      for (int i = 0; i < count; i++) {
+        try {
+          LongWritable param = new LongWritable(RANDOM.nextLong());
+          LongWritable value = (LongWritable) client.call(param, server, null,
+              null);
+          if (!param.equals(value)) {
+            LOG.fatal("Call failed!");
+            failed = true;
+            break;
+          }
+        } catch (Exception e) {
+          LOG.fatal("Caught: " + StringUtils.stringifyException(e));
+          failed = true;
+        }
+      }
+    }
+  }
+
+  private static class ParallelCaller extends Thread {
+    private Client client;
+    private int count;
+    private InetSocketAddress[] addresses;
+    private boolean failed;
+
+    public ParallelCaller(Client client, InetSocketAddress[] addresses,
+        int count) {
+      this.client = client;
+      this.addresses = addresses;
+      this.count = count;
+    }
+
+    public void run() {
+      for (int i = 0; i < count; i++) {
+        try {
+          Writable[] params = new Writable[addresses.length];
+          for (int j = 0; j < addresses.length; j++)
+            params[j] = new LongWritable(RANDOM.nextLong());
+          Writable[] values = client.call(params, addresses, null, null);
+          for (int j = 0; j < addresses.length; j++) {
+            if (!params[j].equals(values[j])) {
+              LOG.fatal("Call failed!");
+              failed = true;
+              break;
+            }
+          }
+        } catch (Exception e) {
+          LOG.fatal("Caught: " + StringUtils.stringifyException(e));
+          failed = true;
+        }
+      }
+    }
+  }
+
+  public void testSerial() throws Exception {
+    testSerial(3, false, 2, 5, 100);
+  }
+
+  public void testSerial(int handlerCount, boolean handlerSleep,
+      int clientCount, int callerCount, int callCount) throws Exception {
+    Server server = new TestServer(handlerCount, handlerSleep);
+    InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    server.start();
+
+    Client[] clients = new Client[clientCount];
+    for (int i = 0; i < clientCount; i++) {
+      clients[i] = new Client(LongWritable.class, conf);
+    }
+
+    SerialCaller[] callers = new SerialCaller[callerCount];
+    for (int i = 0; i < callerCount; i++) {
+      callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount);
+      callers[i].start();
+    }
+    for (int i = 0; i < callerCount; i++) {
+      callers[i].join();
+      assertFalse(callers[i].failed);
+    }
+    for (int i = 0; i < clientCount; i++) {
+      clients[i].stop();
+    }
+    server.stop();
+  }
+
+  public void testParallel() throws Exception {
+    testParallel(10, false, 2, 4, 2, 4, 100);
+  }
+
+  public void testParallel(int handlerCount, boolean handlerSleep,
+      int serverCount, int addressCount, int clientCount, int callerCount,
+      int callCount) throws Exception {
+    Server[] servers = new Server[serverCount];
+    for (int i = 0; i < serverCount; i++) {
+      servers[i] = new TestServer(handlerCount, handlerSleep);
+      servers[i].start();
+    }
+
+    InetSocketAddress[] addresses = new InetSocketAddress[addressCount];
+    for (int i = 0; i < addressCount; i++) {
+      addresses[i] = NetUtils.getConnectAddress(servers[i % serverCount]);
+    }
+
+    Client[] clients = new Client[clientCount];
+    for (int i = 0; i < clientCount; i++) {
+      clients[i] = new Client(LongWritable.class, conf);
+    }
+
+    ParallelCaller[] callers = new ParallelCaller[callerCount];
+    for (int i = 0; i < callerCount; i++) {
+      callers[i] = new ParallelCaller(clients[i % clientCount], addresses,
+          callCount);
+      callers[i].start();
+    }
+    for (int i = 0; i < callerCount; i++) {
+      callers[i].join();
+      assertFalse(callers[i].failed);
+    }
+    for (int i = 0; i < clientCount; i++) {
+      clients[i].stop();
+    }
+    for (int i = 0; i < serverCount; i++) {
+      servers[i].stop();
+    }
+  }
+
+  public void testStandAloneClient() throws Exception {
+    testParallel(10, false, 2, 4, 2, 4, 100);
+    Client client = new Client(LongWritable.class, conf);
+    InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
+    try {
+      client.call(new LongWritable(RANDOM.nextLong()), address, null, null);
+      fail("Expected an exception to have been thrown");
+    } catch (IOException e) {
+      String message = e.getMessage();
+      String addressText = address.toString();
+      assertTrue("Did not find " + addressText + " in " + message, message
+          .contains(addressText));
+      Throwable cause = e.getCause();
+      assertNotNull("No nested exception in " + e, cause);
+      String causeText = cause.getMessage();
+      assertTrue("Did not find " + causeText + " in " + message, message
+          .contains(causeText));
+    }
+  }
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/ipc/TestIPC.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/ipc/TestRPC.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/ipc/TestRPC.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/ipc/TestRPC.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+import junit.framework.TestCase;
+
+public class TestRPC extends TestCase {
+  private static final int PORT = 1234;
+  private static final String ADDRESS = "0.0.0.0";
+
+  public static final Log LOG = LogFactory
+      .getLog("org.apache.hadoop.ipc.TestRPC");
+
+  private static Configuration conf = new Configuration();
+
+  public TestRPC(String name) {
+    super(name);
+  }
+
+  public interface TestProtocol extends VersionedProtocol {
+    public static final long versionID = 1L;
+
+    void ping() throws IOException;
+
+    String echo(String value) throws IOException;
+
+    String[] echo(String[] value) throws IOException;
+
+    Writable echo(Writable value) throws IOException;
+
+    int add(int v1, int v2) throws IOException;
+
+    int add(int[] values) throws IOException;
+
+    int error() throws IOException;
+
+    void testServerGet() throws IOException;
+  }
+
+  public class TestImpl implements TestProtocol {
+
+    public long getProtocolVersion(String protocol, long clientVersion) {
+      return TestProtocol.versionID;
+    }
+
+    public void ping() {
+    }
+
+    public String echo(String value) throws IOException {
+      return value;
+    }
+
+    public String[] echo(String[] values) throws IOException {
+      return values;
+    }
+
+    public Writable echo(Writable writable) {
+      return writable;
+    }
+
+    public int add(int v1, int v2) {
+      return v1 + v2;
+    }
+
+    public int add(int[] values) {
+      int sum = 0;
+      for (int i = 0; i < values.length; i++) {
+        sum += values[i];
+      }
+      return sum;
+    }
+
+    public int error() throws IOException {
+      throw new IOException("bobo");
+    }
+
+    public void testServerGet() throws IOException {
+      if (!(Server.get() instanceof RPC.Server)) {
+        throw new IOException("Server.get() failed");
+      }
+    }
+
+  }
+
+  public void testCalls() throws Exception {
+    Server server = RPC.getServer(new TestImpl(), ADDRESS, PORT, conf);
+    server.start();
+
+    InetSocketAddress addr = new InetSocketAddress(PORT);
+    TestProtocol proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
+        TestProtocol.versionID, addr, conf);
+
+    proxy.ping();
+
+    String stringResult = proxy.echo("foo");
+    assertEquals(stringResult, "foo");
+
+    stringResult = proxy.echo((String) null);
+    assertEquals(stringResult, null);
+
+    String[] stringResults = proxy.echo(new String[] { "foo", "bar" });
+    assertTrue(Arrays.equals(stringResults, new String[] { "foo", "bar" }));
+
+    stringResults = proxy.echo((String[]) null);
+    assertTrue(Arrays.equals(stringResults, null));
+
+    int intResult = proxy.add(1, 2);
+    assertEquals(intResult, 3);
+
+    intResult = proxy.add(new int[] { 1, 2 });
+    assertEquals(intResult, 3);
+
+    boolean caught = false;
+    try {
+      proxy.error();
+    } catch (IOException e) {
+      LOG.debug("Caught " + e);
+      caught = true;
+    }
+    assertTrue(caught);
+
+    proxy.testServerGet();
+
+    // try some multi-calls
+    Method echo = TestProtocol.class.getMethod("echo",
+        new Class[] { String.class });
+    String[] strings = (String[]) RPC.call(echo, new String[][] { { "a" },
+        { "b" } }, new InetSocketAddress[] { addr, addr }, null, conf);
+    assertTrue(Arrays.equals(strings, new String[] { "a", "b" }));
+
+    Method ping = TestProtocol.class.getMethod("ping", new Class[] {});
+    Object[] voids = (Object[]) RPC.call(ping, new Object[][] { {}, {} },
+        new InetSocketAddress[] { addr, addr }, null, conf);
+    assertEquals(voids, null);
+
+    server.stop();
+  }
+
+  public static void main(String[] args) throws Exception {
+
+    new TestRPC("test").testCalls();
+
+  }
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/ipc/TestRPC.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestBytes.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestBytes.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestBytes.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestBytes.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+public class TestBytes extends TestCase {
+  public void testNullHashCode() {
+    byte [] b = null;
+    Exception ee = null;
+    try {
+      Bytes.hashCode(b);
+    } catch (Exception e) {
+      ee = e;
+    }
+    assertNotNull(ee);
+  }
+
+  public void testSplit() throws Exception {
+    byte [] lowest = Bytes.toBytes("AAA");
+    byte [] middle = Bytes.toBytes("CCC");
+    byte [] highest = Bytes.toBytes("EEE");
+    byte [][] parts = Bytes.split(lowest, highest, 1);
+    for (int i = 0; i < parts.length; i++) {
+      System.out.println(Bytes.toString(parts[i]));
+    }
+    assertEquals(3, parts.length);
+    assertTrue(Bytes.equals(parts[1], middle));
+    // Now divide into three parts.  Change highest so split is even.
+    highest = Bytes.toBytes("DDD");
+    parts = Bytes.split(lowest, highest, 2);
+    for (int i = 0; i < parts.length; i++) {
+      System.out.println(Bytes.toString(parts[i]));
+    }
+    assertEquals(4, parts.length);
+    // Assert that 3rd part is 'CCC'.
+    assertTrue(Bytes.equals(parts[2], middle));
+  }
+
+  public void testSplit2() throws Exception {
+    // More split tests.
+    byte [] lowest = Bytes.toBytes("http://A");
+    byte [] highest = Bytes.toBytes("http://z");
+    byte [] middle = Bytes.toBytes("http://]");
+    byte [][] parts = Bytes.split(lowest, highest, 1);
+    for (int i = 0; i < parts.length; i++) {
+      System.out.println(Bytes.toString(parts[i]));
+    }
+    assertEquals(3, parts.length);
+    assertTrue(Bytes.equals(parts[1], middle));
+  }
+
+  public void testToLong() throws Exception {
+    long [] longs = {-1l, 123l, 122232323232l};
+    for (int i = 0; i < longs.length; i++) {
+      byte [] b = Bytes.toBytes(longs[i]);
+      assertEquals(longs[i], Bytes.toLong(b));
+    }
+  }
+
+  public void testToFloat() throws Exception {
+    float [] floats = {-1f, 123.123f, Float.MAX_VALUE};
+    for (int i = 0; i < floats.length; i++) {
+      byte [] b = Bytes.toBytes(floats[i]);
+      assertEquals(floats[i], Bytes.toFloat(b));
+    }
+  }
+
+  public void testToDouble() throws Exception {
+    double [] doubles = {Double.MIN_VALUE, Double.MAX_VALUE};
+    for (int i = 0; i < doubles.length; i++) {
+      byte [] b = Bytes.toBytes(doubles[i]);
+      assertEquals(doubles[i], Bytes.toDouble(b));
+    }
+  }
+
+  public void testBinarySearch() throws Exception {
+    byte [][] arr = {
+        {1},
+        {3},
+        {5},
+        {7},
+        {9},
+        {11},
+        {13},
+        {15},
+    };
+    byte [] key1 = {3,1};
+    byte [] key2 = {4,9};
+    byte [] key2_2 = {4};
+    byte [] key3 = {5,11};
+    
+    assertEquals(1, Bytes.binarySearch(arr, key1, 0, 1,
+      Bytes.BYTES_RAWCOMPARATOR));
+    assertEquals(0, Bytes.binarySearch(arr, key1, 1, 1,
+      Bytes.BYTES_RAWCOMPARATOR));
+    assertEquals(-(2+1), Arrays.binarySearch(arr, key2_2,
+      Bytes.BYTES_COMPARATOR));
+    assertEquals(-(2+1), Bytes.binarySearch(arr, key2, 0, 1,
+      Bytes.BYTES_RAWCOMPARATOR));
+    assertEquals(4, Bytes.binarySearch(arr, key2, 1, 1,
+      Bytes.BYTES_RAWCOMPARATOR));
+    assertEquals(2, Bytes.binarySearch(arr, key3, 0, 1,
+      Bytes.BYTES_RAWCOMPARATOR));
+    assertEquals(5, Bytes.binarySearch(arr, key3, 1, 1,
+      Bytes.BYTES_RAWCOMPARATOR));
+  }
+  
+  public void testIncrementBytes() throws IOException {
+
+    assertTrue(checkTestIncrementBytes(10, 1));
+    assertTrue(checkTestIncrementBytes(12, 123435445));
+    assertTrue(checkTestIncrementBytes(124634654, 1));
+    assertTrue(checkTestIncrementBytes(10005460, 5005645));
+    assertTrue(checkTestIncrementBytes(1, -1));
+    assertTrue(checkTestIncrementBytes(10, -1));
+    assertTrue(checkTestIncrementBytes(10, -5));
+    assertTrue(checkTestIncrementBytes(1005435000, -5));
+    assertTrue(checkTestIncrementBytes(10, -43657655));
+    assertTrue(checkTestIncrementBytes(-1, 1));
+    assertTrue(checkTestIncrementBytes(-26, 5034520));
+    assertTrue(checkTestIncrementBytes(-10657200, 5));
+    assertTrue(checkTestIncrementBytes(-12343250, 45376475));
+    assertTrue(checkTestIncrementBytes(-10, -5));
+    assertTrue(checkTestIncrementBytes(-12343250, -5));
+    assertTrue(checkTestIncrementBytes(-12, -34565445));
+    assertTrue(checkTestIncrementBytes(-1546543452, -34565445));
+  }
+  
+  private static boolean checkTestIncrementBytes(long val, long amount) 
+  throws IOException {
+    byte[] value = Bytes.toBytes(val);
+    byte [] testValue = {-1, -1, -1, -1, -1, -1, -1, -1};
+    if (value[0] > 0) {
+      testValue = new byte[Bytes.SIZEOF_LONG];
+    }
+    System.arraycopy(value, 0, testValue, testValue.length - value.length,
+        value.length);
+
+    long incrementResult = Bytes.toLong(Bytes.incrementBytes(value, amount));
+
+    return (Bytes.toLong(testValue) + amount) == incrementResult;
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestBytes.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestNumeric.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestNumeric.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestNumeric.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestNumeric.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,34 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import junit.framework.TestCase;
+
+public class TestNumeric extends TestCase {
+  final static int TEST_INT = 3;
+  final static double TEST_DOUBLE = 0.4;
+
+  /**
+   * Double conversion test
+   */
+  public void testDouble() {
+    assertEquals(Bytes.toDouble(Bytes.toBytes(TEST_DOUBLE)), TEST_DOUBLE);
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestNumeric.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestRandomVariable.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestRandomVariable.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestRandomVariable.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestRandomVariable.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,74 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import org.apache.log4j.Logger;
+
+import junit.framework.TestCase;
+
+/**
+ * Random variable generation test
+ */
+public class TestRandomVariable extends TestCase {
+  static final Logger LOG = Logger.getLogger(TestRandomVariable.class);
+  final static int COUNT = 50;
+
+  /**
+   * Random object test
+   * 
+   * @throws Exception
+   */
+  public void testRand() throws Exception {
+    for (int i = 0; i < COUNT; i++) {
+      double result = RandomVariable.rand();
+      assertTrue(result >= 0.0d && result <= 1.0);
+    }
+  }
+
+  /**
+   * Random integer test
+   * 
+   * @throws Exception
+   */
+  public void testRandInt() throws Exception {
+    final int min = 122;
+    final int max = 561;
+
+    for (int i = 0; i < COUNT; i++) {
+      int result = RandomVariable.randInt(min, max);
+      assertTrue(result >= min && result <= max);
+    }
+  }
+
+  /**
+   * Uniform test
+   * 
+   * @throws Exception
+   */
+  public void testUniform() throws Exception {
+    final double min = 1.0d;
+    final double max = 3.0d;
+
+    for (int i = 0; i < COUNT; i++) {
+      double result = RandomVariable.uniform(min, max);
+      assertTrue(result >= min && result <= max);
+    }
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestRandomVariable.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.zookeeper;
+
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+
+import junit.framework.TestCase;
+
+public class TestZKTools extends TestCase {
+
+  public void testZKProps() {
+    HamaConfiguration conf = new HamaConfiguration();
+    conf.set(Constants.ZOOKEEPER_QUORUM, "test.com:123");
+    conf.set(Constants.ZOOKEEPER_CLIENT_PORT, "2222");
+
+    assertEquals("test.com:2222", QuorumPeer.getZKQuorumServersString(conf));
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/test/java/testjar/ClassSerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/testjar/ClassSerializePrinting.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/testjar/ClassSerializePrinting.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/testjar/ClassSerializePrinting.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package testjar;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.zookeeper.KeeperException;
+
+public class ClassSerializePrinting {
+  private static String TMP_OUTPUT = "/tmp/test-example/";
+
+  public static class HelloBSP extends BSP {
+    public static final Log LOG = LogFactory.getLog(HelloBSP.class);
+    private Configuration conf;
+    private final static int PRINT_INTERVAL = 1000;
+    private FileSystem fileSys;
+    private int num;
+
+    public void bsp(BSPPeer bspPeer) throws IOException,
+        KeeperException, InterruptedException {
+
+      int i = 0;
+      for (String otherPeer : bspPeer.getAllPeerNames()) {
+        String peerName = bspPeer.getPeerName();
+        if (peerName.equals(otherPeer)) {
+          writeLogToFile(peerName, i);
+        }
+
+        Thread.sleep(PRINT_INTERVAL);
+        bspPeer.sync();
+        i++;
+      }
+    }
+
+    private void writeLogToFile(String string, int i) throws IOException {
+      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+          new Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
+          CompressionType.NONE);
+      writer.append(new LongWritable(System.currentTimeMillis()), new Text(
+          "Hello BSP from " + (i + 1) + " of " + num + ": " + string));
+      writer.close();
+    }
+
+    public Configuration getConf() {
+      return conf;
+    }
+
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+      num = Integer.parseInt(conf.get("bsp.peers.num"));
+      try {
+        fileSys = FileSystem.get(conf);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+
+  }
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/test/java/testjar/ClassSerializePrinting.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/test/java/testjar/testjob.jar
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/testjar/testjob.jar?rev=1176297&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/hama/branches/HamaV2/core/src/test/java/testjar/testjob.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Propchange: incubator/hama/branches/HamaV2/core/target/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Sep 27 09:35:21 2011
@@ -0,0 +1 @@
+*

Added: incubator/hama/branches/HamaV2/core/target/.plxarc
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/target/.plxarc?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/target/.plxarc (added)
+++ incubator/hama/branches/HamaV2/core/target/.plxarc Tue Sep 27 09:35:21 2011
@@ -0,0 +1 @@
+maven-shared-archive-resources
\ No newline at end of file

Added: incubator/hama/branches/HamaV2/core/target/classes/META-INF/DEPENDENCIES
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/target/classes/META-INF/DEPENDENCIES?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/target/classes/META-INF/DEPENDENCIES (added)
+++ incubator/hama/branches/HamaV2/core/target/classes/META-INF/DEPENDENCIES Tue Sep 27 09:35:21 2011
@@ -0,0 +1,103 @@
+// ------------------------------------------------------------------
+// Transitive dependencies of this project determined from the
+// maven pom organized by organization.
+// ------------------------------------------------------------------
+
+Apache Hama Core
+
+
+From: 'an unknown organization'
+  - geronimo-spec-jta  geronimo-spec:geronimo-spec-jta:jar:1.0.1B-rc4
+
+  - HSQLDB (http://hsqldb.org/) hsqldb:hsqldb:jar:1.8.0.10
+    License: HSQLDB License  (http://hsqldb.org/web/hsqlLicense.html)
+  - JLine (http://jline.sourceforge.net) jline:jline:jar:0.9.94
+    License: BSD  (LICENSE.txt)
+  - An open source Java toolkit for Amazon S3 (http://jets3t.s3.amazonaws.com/index.html) net.java.dev.jets3t:jets3t:jar:0.7.1
+    License: Apache License, Version 2.0  (http://www.apache.org/licenses/LICENSE-2.0)
+  - kosmosfs (http://kosmosfs.sourceforge.net/) net.sf.kosmosfs:kfs:jar:0.3
+    License: The Apache Software License, Version 2.0  (http://www.apache.org/licenses/LICENSE-2.0.txt)
+  - hadoop-core  org.apache.hadoop:hadoop-core:jar:0.20.2
+
+  - hadoop-test  org.apache.hadoop:hadoop-test:jar:0.20.2
+
+  - servlet-api  org.apache.tomcat:servlet-api:jar:6.0.32
+
+  - zookeeper  org.apache.zookeeper:zookeeper:jar:3.3.1
+
+  - Eclipse JDT Core (http://www.eclipse.org/jdt/) org.eclipse.jdt:core:jar:3.1.1
+    License: Eclipse Public License v1.0  (http://www.eclipse.org/org/documents/epl-v10.php)
+  - oro  oro:oro:jar:2.0.8
+
+  - jasper-compiler  tomcat:jasper-compiler:jar:5.5.12
+
+  - jasper-runtime  tomcat:jasper-runtime:jar:5.5.12
+
+  - xmlenc Library (http://xmlenc.sourceforge.net) xmlenc:xmlenc:jar:0.52
+    License: The BSD License  (http://www.opensource.org/licenses/bsd-license.php)
+
+From: 'Apache MINA Project' (http://mina.apache.org/)
+  - Apache MINA Core (http://mina.apache.org/mina-core) org.apache.mina:mina-core:bundle:2.0.0-M5
+    License: Apache 2.0 License  (http://www.apache.org/licenses/LICENSE-2.0)
+
+From: 'Apache Software Foundation' (http://jakarta.apache.org/)
+  - HttpClient (http://jakarta.apache.org/commons/httpclient/) commons-httpclient:commons-httpclient:jar:3.0.1
+    License: Apache License  (http://www.apache.org/licenses/LICENSE-2.0)
+
+From: 'Apache Software Foundation' (http://www.apache.org)
+  - Annotation 1.0 (http://geronimo.apache.org/specs/geronimo-annotation_1.0_spec) org.apache.geronimo.specs:geronimo-annotation_1.0_spec:jar:1.0
+    License: The Apache Software License, Version 2.0  (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+From: 'Apache Software Foundation'
+  - org.apache.tools.ant (http://ant.apache.org/ant/) org.apache.ant:ant:jar:1.7.1
+
+  - ant-launcher (http://ant.apache.org/ant-launcher/) org.apache.ant:ant-launcher:jar:1.7.1
+
+
+From: 'Mort Bay Consulting' (http://www.mortbay.com)
+  - Jetty Server (http://jetty.mortbay.org/project/modules/jetty) org.mortbay.jetty:jetty:jar:6.1.14
+    License: Apache License Version 2.0  (http://www.apache.org/licenses/LICENSE-2.0)
+  - Servlet Annotations (http://jetty.mortbay.org/project/jetty-annotations) org.mortbay.jetty:jetty-annotations:jar:6.1.14
+    License: Apache License Version 2.0  (http://www.apache.org/licenses/LICENSE-2.0)
+  - Jetty Plus (http://jetty.mortbay.org/project/jetty-plus) org.mortbay.jetty:jetty-plus:jar:6.1.14
+    License: Apache License Version 2.0  (http://www.apache.org/licenses/LICENSE-2.0)
+  - Jetty Utilities (http://jetty.mortbay.org/project/jetty-util) org.mortbay.jetty:jetty-util:jar:6.1.14
+    License: Apache License Version 2.0  (http://www.apache.org/licenses/LICENSE-2.0)
+  - Glassfish Jasper (http://jetty.mortbay.org/project/modules/jsp-2.1) org.mortbay.jetty:jsp-2.1:jar:6.1.14
+    License: CDDL 1.0  (https://glassfish.dev.java.net/public/CDDLv1.0.html)
+  - Glassfish Jasper API (http://jetty.mortbay.org/project/modules/jsp-api-2.1) org.mortbay.jetty:jsp-api-2.1:jar:6.1.14
+    License: Apache License Version 2.0  (http://www.apache.org/licenses/LICENSE-2.0)
+  - Servlet Specification 2.5 API (http://jetty.mortbay.org/project/modules/servlet-api-2.5) org.mortbay.jetty:servlet-api-2.5:jar:6.1.14
+    License: CDDL 1.0  (https://glassfish.dev.java.net/public/CDDLv1.0.html)
+
+From: 'QOS.ch' (http://www.qos.ch)
+  - SLF4J API Module (http://www.slf4j.org) org.slf4j:slf4j-api:jar:1.5.2
+
+  - SLF4J LOG4J-12 Binding (http://www.slf4j.org) org.slf4j:slf4j-log4j12:jar:1.5.2
+
+
+From: 'The Apache Software Foundation' (http://jakarta.apache.org)
+  - Codec (http://jakarta.apache.org/commons/codec/) commons-codec:commons-codec:jar:1.3
+    License: The Apache Software License, Version 2.0  (/LICENSE.txt)
+  - EL (http://jakarta.apache.org/commons/el/) commons-el:commons-el:jar:1.0
+    License: The Apache Software License, Version 2.0  (/LICENSE.txt)
+  - Jakarta Commons Net (http://jakarta.apache.org/commons/${pom.artifactId.substring(8)}/) commons-net:commons-net:jar:1.4.1
+    License: The Apache Software License, Version 2.0  (/LICENSE.txt)
+
+From: 'The Apache Software Foundation' (http://www.apache.org/)
+  - ant (http://www.apache.org/ant/) ant:ant:jar:1.6.5
+    License: The Apache Software License, Version 2.0  (http://www.apache.org/licenses/LICENSE-2.0.txt)
+  - Commons CLI (http://commons.apache.org/cli/) commons-cli:commons-cli:jar:1.2
+    License: The Apache Software License, Version 2.0  (http://www.apache.org/licenses/LICENSE-2.0.txt)
+  - Commons Logging (http://commons.apache.org/logging) commons-logging:commons-logging:jar:1.1.1
+    License: The Apache Software License, Version 2.0  (http://www.apache.org/licenses/LICENSE-2.0.txt)
+  - Apache Ftplet API (http://mina.apache.org/ftpserver) org.apache.ftpserver:ftplet-api:bundle:1.0.0
+    License: Apache 2.0 License  (http://www.apache.org/licenses/LICENSE-2.0)
+  - Apache FtpServer Core (http://mina.apache.org/ftpserver/ftpserver-core) org.apache.ftpserver:ftpserver-core:bundle:1.0.0
+    License: Apache 2.0 License  (http://www.apache.org/licenses/LICENSE-2.0)
+  - Apache FtpServer Deprecated classes (http://mina.apache.org/ftpserver/ftpserver-deprecated) org.apache.ftpserver:ftpserver-deprecated:jar:1.0.0-M2
+    License: Apache 2.0 License  (http://www.apache.org/licenses/LICENSE-2.0)
+
+
+
+