You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/10/12 04:39:14 UTC

svn commit: r1021596 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/ src/java/org/apache/hama/bsp/ src/test/org/apache/hama/ src/test/org/apache/hama/bsp/

Author: edwardyoon
Date: Tue Oct 12 02:39:13 2010
New Revision: 1021596

URL: http://svn.apache.org/viewvc?rev=1021596&view=rev
Log:
Fix unit tests bug

Added:
    incubator/hama/trunk/src/java/org/apache/hama/MiniZooKeeperCluster.java
    incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java
    incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java
    incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java
      - copied, changed from r1021565, incubator/hama/trunk/src/test/org/apache/hama/bsp/MiniBSPCluster.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/TestClusterStatus.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/TestSerializePrinting.java
Removed:
    incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPTestDriver.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/MiniBSPCluster.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java
    incubator/hama/trunk/src/test/org/apache/hama/HamaCluster.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1021596&r1=1021595&r2=1021596&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Oct 12 02:39:13 2010
@@ -168,6 +168,7 @@ Trunk (unreleased changes)
 
   BUG FIXES
   
+    HAMA-301: Fix unit test fail bug (edwardyoon)
     HAMA-303: slave.host.name is superceded by bsp.peer.hostname 
                 (Filipe Manana via edwardyoon)
     HAMA-296: Fix warning about deprecated method (edwardyoon)

Added: incubator/hama/trunk/src/java/org/apache/hama/MiniZooKeeperCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/MiniZooKeeperCluster.java?rev=1021596&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/MiniZooKeeperCluster.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/MiniZooKeeperCluster.java Tue Oct 12 02:39:13 2010
@@ -0,0 +1,196 @@
+package org.apache.hama;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+
+public class MiniZooKeeperCluster {
+  private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
+
+  private static final int TICK_TIME = 2000;
+  private static final int CONNECTION_TIMEOUT = 30000;
+
+  private boolean started;
+  private int clientPort = 21810; // use non-standard port
+
+  private NIOServerCnxn.Factory standaloneServerFactory;
+  private int tickTime = 0;
+
+  /** Create mini Zookeeper cluster. */
+  public MiniZooKeeperCluster() {
+    this.started = false;
+  }
+
+  public void setClientPort(int clientPort) {
+    this.clientPort = clientPort;
+  }
+
+  public void setTickTime(int tickTime) {
+    this.tickTime = tickTime;
+  }
+
+  private static void setupTestEnv() {
+    // during the tests we run with 100K prealloc in the logs.
+    // on windows systems prealloc of 64M was seen to take ~15seconds
+    // resulting in test failure (client timeout on first session).
+    // set env and directly in order to handle static init/gc issues
+    System.setProperty("zookeeper.preAllocSize", "100");
+    FileTxnLog.setPreallocSize(100);
+  }
+
+  /**
+   * @param baseDir
+   * @return ClientPort server bound to.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public int startup(File baseDir) throws IOException,
+      InterruptedException {
+
+    setupTestEnv();
+
+    shutdown();
+
+    File dir = new File(baseDir, "zookeeper").getAbsoluteFile();
+    recreateDir(dir);
+
+    int tickTimeToUse;
+    if (this.tickTime > 0) {
+      tickTimeToUse = this.tickTime;
+    } else {
+      tickTimeToUse = TICK_TIME;
+    }
+    ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
+    while (true) {
+      try {
+        standaloneServerFactory =
+          new NIOServerCnxn.Factory(new InetSocketAddress(clientPort));
+      } catch (BindException e) {
+        LOG.info("Faild binding ZK Server to client port: " + clientPort);
+        //this port is already in use. try to use another
+        clientPort++;
+        continue;
+      }
+      break;
+    }
+    standaloneServerFactory.startup(server);
+
+    if (!waitForServerUp(clientPort, CONNECTION_TIMEOUT)) {
+      throw new IOException("Waiting for startup of standalone server");
+    }
+
+    started = true;
+
+    return clientPort;
+  }
+
+  private void recreateDir(File dir) throws IOException {
+    if (dir.exists()) {
+      FileUtil.fullyDelete(dir);
+    }
+    try {
+      dir.mkdirs();
+    } catch (SecurityException e) {
+      throw new IOException("creating dir: " + dir, e);
+    }
+  }
+
+  /**
+   * @throws IOException
+   */
+  public void shutdown() throws IOException {
+    if (!started) {
+      return;
+    }
+
+    standaloneServerFactory.shutdown();
+    if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+      throw new IOException("Waiting for shutdown of standalone server");
+    }
+
+    started = false;
+  }
+
+  // XXX: From o.a.zk.t.ClientBase
+  private static boolean waitForServerDown(int port, long timeout) {
+    long start = System.currentTimeMillis();
+    while (true) {
+      try {
+        Socket sock = new Socket("localhost", port);
+        try {
+          OutputStream outstream = sock.getOutputStream();
+          outstream.write("stat".getBytes());
+          outstream.flush();
+        } finally {
+          sock.close();
+        }
+      } catch (IOException e) {
+        return true;
+      }
+
+      if (System.currentTimeMillis() > start + timeout) {
+        break;
+      }
+      try {
+        Thread.sleep(250);
+      } catch (InterruptedException e) {
+        // ignore
+      }
+    }
+    return false;
+  }
+
+  // XXX: From o.a.zk.t.ClientBase
+  private static boolean waitForServerUp(int port, long timeout) {
+    long start = System.currentTimeMillis();
+    while (true) {
+      try {
+        Socket sock = new Socket("localhost", port);
+        BufferedReader reader = null;
+        try {
+          OutputStream outstream = sock.getOutputStream();
+          outstream.write("stat".getBytes());
+          outstream.flush();
+
+          Reader isr = new InputStreamReader(sock.getInputStream());
+          reader = new BufferedReader(isr);
+          String line = reader.readLine();
+          if (line != null && line.startsWith("Zookeeper version:")) {
+            return true;
+          }
+        } finally {
+          sock.close();
+          if (reader != null) {
+            reader.close();
+          }
+        }
+      } catch (IOException e) {
+        // ignore as this is expected
+        LOG.info("server localhost:" + port + " not up " + e);
+      }
+
+      if (System.currentTimeMillis() > start + timeout) {
+        break;
+      }
+      try {
+        Thread.sleep(250);
+      } catch (InterruptedException e) {
+        // ignore
+      }
+    }
+    return false;
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java?rev=1021596&r1=1021595&r2=1021596&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java Tue Oct 12 02:39:13 2010
@@ -65,16 +65,16 @@ public class ClusterStatus implements Wr
   /**
    * 
    */
-  ClusterStatus() {}
+  public ClusterStatus() {}
     
-  ClusterStatus(int grooms, int tasks, int maxTasks, BSPMaster.State state) {
+  public ClusterStatus(int grooms, int tasks, int maxTasks, BSPMaster.State state) {
     this.numActiveGrooms = grooms;
     this.tasks = tasks;
     this.maxTasks = maxTasks;
     this.state = state;
   }
   
-  ClusterStatus(Collection<String> activeGrooms, int tasks, int maxTasks,
+  public ClusterStatus(Collection<String> activeGrooms, int tasks, int maxTasks,
       BSPMaster.State state) {
     this(activeGrooms.size(), tasks, maxTasks, state);
     this.activeGrooms = activeGrooms;

Modified: incubator/hama/trunk/src/test/org/apache/hama/HamaCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/HamaCluster.java?rev=1021596&r1=1021595&r2=1021596&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/HamaCluster.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/HamaCluster.java Tue Oct 12 02:39:13 2010
@@ -19,35 +19,21 @@
  */
 package org.apache.hama;
 
-import junit.framework.TestCase;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hama.bsp.LocalBSPCluster;
 
 /**
- * Forming up the miniDfs and miniHbase
+ * Forming up the miniDfs and miniZooKeeper
  */
-public abstract class HamaCluster extends TestCase {
+public abstract class HamaCluster extends HamaClusterTestCase {
   public static final Log LOG = LogFactory.getLog(HamaCluster.class);
   protected final static HamaConfiguration conf = new HamaConfiguration();
 
   protected void setUp() throws Exception {
     super.setUp();
-    
-    String[] args = new String[0];
-    StringUtils.startupShutdownMessage(LocalBSPCluster.class, args, LOG);
-    HamaConfiguration conf = new HamaConfiguration();
-    //LocalBSPCluster cluster = new LocalBSPCluster(conf);
-    //cluster.startup();
   }
 
   protected static HamaConfiguration getConf() {
     return conf;
   }
-  
-  protected void setMiniBSPCluster() {
-    // TODO Auto-generated method stub    
-  }
 }

Added: incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java?rev=1021596&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java Tue Oct 12 02:39:13 2010
@@ -0,0 +1,150 @@
+package org.apache.hama;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public abstract class HamaClusterTestCase extends HamaTestCase {
+  public static final Log LOG = LogFactory.getLog(HamaClusterTestCase.class);
+  public MiniBSPCluster cluster;
+  protected MiniDFSCluster dfsCluster;
+  protected MiniZooKeeperCluster zooKeeperCluster;
+  protected int groomServers;
+  protected boolean startDfs;
+
+  /** default constructor */
+  public HamaClusterTestCase() {
+    this(1);
+  }
+
+  public HamaClusterTestCase(int groomServers) {
+    this(groomServers, true);
+  }
+
+  public HamaClusterTestCase(int groomServers, boolean startDfs) {
+    super();
+    this.startDfs = startDfs;
+    this.groomServers = groomServers;
+  }
+
+  /**
+   * Actually start the MiniBSP instance.
+   */
+  protected void hamaClusterSetup() throws Exception {
+    File testDir = new File(getUnitTestdir(getName()).toString());
+
+    // Note that this is done before we create the MiniHBaseCluster because we
+    // need to edit the config to add the ZooKeeper servers.
+    this.zooKeeperCluster = new MiniZooKeeperCluster();
+    int clientPort = this.zooKeeperCluster.startup(testDir);
+    conf.set("hbase.zookeeper.property.clientPort", Integer.toString(clientPort));
+
+    // start the mini cluster
+    this.cluster = new MiniBSPCluster(conf, groomServers);
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    try {
+      if (this.startDfs) {
+        // This spews a bunch of warnings about missing scheme. TODO: fix.
+        this.dfsCluster = new MiniDFSCluster(0, this.conf, 2, true, true, true,
+          null, null, null, null);
+
+        // mangle the conf so that the fs parameter points to the minidfs we
+        // just started up
+        FileSystem filesystem = dfsCluster.getFileSystem();
+        conf.set("fs.defaultFS", filesystem.getUri().toString());
+        Path parentdir = filesystem.getHomeDirectory();
+        
+        filesystem.mkdirs(parentdir);
+      }
+
+      // do the super setup now. if we had done it first, then we would have
+      // gotten our conf all mangled and a local fs started up.
+      super.setUp();
+
+      // start the instance
+      hamaClusterSetup();
+
+    } catch (Exception e) {
+      LOG.error("Exception in setup!", e);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+      if (zooKeeperCluster != null) {
+        zooKeeperCluster.shutdown();
+      }
+      if (dfsCluster != null) {
+        shutdownDfs(dfsCluster);
+      }
+      throw e;
+    }
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+    try {
+      if (this.cluster != null) {
+        try {
+          this.cluster.shutdown();
+        } catch (Exception e) {
+          LOG.warn("Closing mini dfs", e);
+        }
+        try {
+          this.zooKeeperCluster.shutdown();
+        } catch (IOException e) {
+          LOG.warn("Shutting down ZooKeeper cluster", e);
+        }
+      }
+      if (startDfs) {
+        shutdownDfs(dfsCluster);
+      }
+    } catch (Exception e) {
+      LOG.error(e);
+    }
+  }
+
+
+  /**
+   * Use this utility method debugging why cluster won't go down.  On a
+   * period it throws a thread dump.  Method ends when all cluster
+   * regionservers and master threads are no long alive.
+   */
+  public void threadDumpingJoin() {
+    if (this.cluster.getGroomServerThreads() != null) {
+      for(Thread t: this.cluster.getGroomServerThreads()) {
+        threadDumpingJoin(t);
+      }
+    }
+    threadDumpingJoin(this.cluster.getMaster());
+  }
+
+  protected void threadDumpingJoin(final Thread t) {
+    if (t == null) {
+      return;
+    }
+    long startTime = System.currentTimeMillis();
+    while (t.isAlive()) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        LOG.info("Continuing...", e);
+      }
+      if (System.currentTimeMillis() - startTime > 60000) {
+        startTime = System.currentTimeMillis();
+        ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
+            "Automatic Stack Trace every 60 seconds waiting on " +
+            t.getName());
+      }
+    }
+  }
+}

Added: incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java?rev=1021596&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java Tue Oct 12 02:39:13 2010
@@ -0,0 +1,159 @@
+package org.apache.hama;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.AssertionFailedError;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hama.util.Bytes;
+
+public abstract class HamaTestCase extends TestCase {
+  private static Log LOG = LogFactory.getLog(HamaTestCase.class);
+  
+  /** configuration parameter name for test directory */
+  public static final String TEST_DIRECTORY_KEY = "test.build.data";
+
+  private boolean localfs = false;
+  protected Path testDir = null;
+  protected FileSystem fs = null;
+  
+  static {
+    initialize();
+  }
+
+  public volatile HamaConfiguration conf;
+
+  /** constructor */
+  public HamaTestCase() {
+    super();
+    init();
+  }
+  
+  /**
+   * @param name
+   */
+  public HamaTestCase(String name) {
+    super(name);
+    init();
+  }
+  
+  private void init() {
+    conf = new HamaConfiguration();
+  }
+
+  /**
+   * Note that this method must be called after the mini hdfs cluster has
+   * started or we end up with a local file system.
+   */
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    localfs =
+      (conf.get("fs.defaultFS", "file:///").compareTo("file:///") == 0);
+
+    if (fs == null) {
+      this.fs = FileSystem.get(conf);
+    }
+    try {
+      if (localfs) {
+        this.testDir = getUnitTestdir(getName());
+        if (fs.exists(testDir)) {
+          fs.delete(testDir, true);
+        }
+      } else {
+        this.testDir =
+          this.fs.makeQualified(new Path("/tmp/hama-test"));
+      }
+    } catch (Exception e) {
+      LOG.fatal("error during setup", e);
+      throw e;
+    }
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    try {
+      if (localfs) {
+        if (this.fs.exists(testDir)) {
+          this.fs.delete(testDir, true);
+        }
+      }
+    } catch (Exception e) {
+      LOG.fatal("error during tear down", e);
+    }
+    super.tearDown();
+  }
+
+  protected Path getUnitTestdir(String testName) {
+    return new Path(
+        conf.get(TEST_DIRECTORY_KEY, "test/build/data"), testName);
+  }
+
+  /**
+   * Initializes parameters used in the test environment:
+   *
+   * Sets the configuration parameter TEST_DIRECTORY_KEY if not already set.
+   * Sets the boolean debugging if "DEBUGGING" is set in the environment.
+   * If debugging is enabled, reconfigures logging so that the root log level is
+   * set to WARN and the logging level for the package is set to DEBUG.
+   */
+  public static void initialize() {
+    if (System.getProperty(TEST_DIRECTORY_KEY) == null) {
+      System.setProperty(TEST_DIRECTORY_KEY, new File(
+          "build/hama/test").getAbsolutePath());
+    }
+  }
+
+  /**
+   * Common method to close down a MiniDFSCluster and the associated file system
+   *
+   * @param cluster
+   */
+  public static void shutdownDfs(MiniDFSCluster cluster) {
+    if (cluster != null) {
+      LOG.info("Shutting down Mini DFS ");
+      try {
+        cluster.shutdown();
+      } catch (Exception e) {
+        /// Can get a java.lang.reflect.UndeclaredThrowableException thrown
+        // here because of an InterruptedException. Don't let exceptions in
+        // here be cause of test failure.
+      }
+      try {
+        FileSystem fs = cluster.getFileSystem();
+        if (fs != null) {
+          LOG.info("Shutting down FileSystem");
+          fs.close();
+        }
+        FileSystem.closeAll();
+      } catch (IOException e) {
+        LOG.error("error closing file system", e);
+      }
+    }
+  }
+
+  public void assertByteEquals(byte[] expected,
+                               byte[] actual) {
+    if (Bytes.compareTo(expected, actual) != 0) {
+      throw new AssertionFailedError("expected:<" +
+      Bytes.toString(expected) + "> but was:<" +
+      Bytes.toString(actual) + ">");
+    }
+  }
+
+  public static void assertEquals(byte[] expected,
+                               byte[] actual) {
+    if (Bytes.compareTo(expected, actual) != 0) {
+      throw new AssertionFailedError("expected:<" +
+      Bytes.toStringBinary(expected) + "> but was:<" +
+      Bytes.toStringBinary(actual) + ">");
+    }
+  }
+
+}

Copied: incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java (from r1021565, incubator/hama/trunk/src/test/org/apache/hama/bsp/MiniBSPCluster.java)
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java?p2=incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java&p1=incubator/hama/trunk/src/test/org/apache/hama/bsp/MiniBSPCluster.java&r1=1021565&r2=1021596&rev=1021596&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/MiniBSPCluster.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java Tue Oct 12 02:39:13 2010
@@ -1,5 +1,28 @@
-package org.apache.hama.bsp;
+package org.apache.hama;
+
+import java.util.List;
+
+import org.apache.hama.HamaConfiguration;
 
 public class MiniBSPCluster {
 
+  public MiniBSPCluster(HamaConfiguration conf, int groomServers) {
+    // TODO Auto-generated constructor stub
+  }
+
+  public void shutdown() {
+    // TODO Auto-generated method stub
+    
+  }
+
+  public List<Thread> getGroomServerThreads() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  public Thread getMaster() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
 }

Added: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java?rev=1021596&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java Tue Oct 12 02:39:13 2010
@@ -0,0 +1,179 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.util.Bytes;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+public class TestBSPPeer extends HamaCluster implements Watcher {
+  private Log LOG = LogFactory.getLog(TestBSPPeer.class);
+
+  private static final int NUM_PEER = 35;
+  private static final int ROUND = 3;
+  private static final int PAYLOAD = 1024; // 1kb in default
+  List<BSPPeerThread> list = new ArrayList<BSPPeerThread>(NUM_PEER);
+  Configuration conf;
+  private Random r = new Random();
+
+  public TestBSPPeer() {
+    this.conf = getConf();
+  }
+
+  public void setUp() throws Exception {    
+    super.setUp();
+
+    ZooKeeper zk = new ZooKeeper("localhost:21810", 3000, this);
+    Stat s = null;
+    if (zk != null) {
+      try {
+        s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false);
+      } catch (Exception e) {
+        LOG.error(s);
+      }
+
+      if (s == null) {
+        try {
+          zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
+              Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        } catch (KeeperException e) {
+          LOG.error(e);
+        } catch (InterruptedException e) {
+          LOG.error(e);
+        }
+      }
+    }
+  }
+
+  public class BSPPeerThread extends Thread {
+    private BSPPeer peer;
+    private int MAXIMUM_DURATION = 5;    
+
+    public BSPPeerThread(Configuration conf) throws IOException {
+      this.peer = new BSPPeer(conf);
+    }
+
+    @Override
+    public void run() {
+      int randomTime;
+      byte[] dummyData = new byte[PAYLOAD];
+      BSPMessage msg = null;
+      InetSocketAddress addr = null;
+
+      for (int i = 0; i < ROUND; i++) {
+        randomTime = r.nextInt(MAXIMUM_DURATION) + 5;
+
+        for (int j = 0; j < 10; j++) {
+          r.nextBytes(dummyData);
+          msg = new BSPMessage(Bytes.tail(dummyData, 128), dummyData);
+          addr = new InetSocketAddress("localhost", 30000 + j);
+          try {
+            peer.send(addr, msg);
+          } catch (IOException e) {
+            LOG.info(e);
+          }
+        }
+
+        try {
+          Thread.sleep(randomTime * 1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+
+        try {
+          peer.sync();
+        } catch (IOException e) {
+          e.printStackTrace();
+        } catch (KeeperException e) {
+          e.printStackTrace();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+
+        verifyPayload();
+      }
+    }
+
+    private void verifyPayload() {
+      System.out.println("[" + getName() + "] verifying "
+          + peer.localQueue.size() + " messages");
+      BSPMessage msg = null;
+
+      try {
+        while ((msg = peer.getCurrentMessage()) != null) {
+          assertEquals(Bytes.compareTo(msg.tag, 0, 128, msg.data,
+              msg.data.length - 128, 128), 0);
+        }
+      } catch (IOException e) {
+        LOG.error(e);
+      }
+
+      peer.localQueue.clear();
+    }
+    
+    public BSPPeer getBSPPeer() {
+      return this.peer;
+    }
+  }
+
+  public void testSync() throws InterruptedException, IOException {
+
+    BSPPeerThread thread;
+    conf.setInt("bsp.peers.num", NUM_PEER);
+    conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+    conf.set(Constants.PEER_HOST, "localhost");
+    conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
+    
+    for (int i = 0; i < NUM_PEER; i++) {
+      conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
+      thread = new BSPPeerThread(conf);
+      list.add(thread);
+    }
+
+    for (int i = 0; i < NUM_PEER; i++) {
+      list.get(i).start();
+    }
+
+    for (int i = 0; i < NUM_PEER; i++) {
+      list.get(i).join();
+    }
+  }
+  
+  @Override
+  public void process(WatchedEvent event) {
+  }
+}

Added: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestClusterStatus.java?rev=1021596&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestClusterStatus.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestClusterStatus.java Tue Oct 12 02:39:13 2010
@@ -0,0 +1,58 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hama.bsp.BSPMaster;
+import org.apache.hama.bsp.ClusterStatus;
+
+public class TestClusterStatus extends TestCase {
+  Random rnd = new Random();
+  
+  protected void setUp() throws Exception {
+    super.setUp();
+  }
+
+  public final void testWriteAndReadFields() throws IOException {
+    DataOutputBuffer out = new DataOutputBuffer();
+    DataInputBuffer in = new DataInputBuffer();
+    
+    ClusterStatus status1;
+    List<String> grooms = new ArrayList<String>();
+        
+    for(int i=0;i< 10;i++) {
+      grooms.add("groom_"+rnd.nextInt());
+    }
+    
+    int tasks = rnd.nextInt(100);
+    int maxTasks = rnd.nextInt(100);
+    BSPMaster.State state = BSPMaster.State.RUNNING;
+    
+    status1 = new ClusterStatus(grooms, tasks, maxTasks, state);    
+    status1.write(out);
+    
+    in.reset(out.getData(), out.getLength());
+   
+    ClusterStatus status2 = new ClusterStatus();
+    status2.readFields(in);
+    
+    Set<String> grooms_s = new HashSet<String>(status1.getActiveGroomNames());
+    Set<String> grooms_o = new HashSet<String>(status2.getActiveGroomNames());     
+    
+    assertEquals(status1.getGroomServers(), status2.getGroomServers());
+    
+    assertTrue(grooms_s.containsAll(grooms_o));
+    assertTrue(grooms_o.containsAll(grooms_s));    
+    
+    assertEquals(status1.getTasks(),status2.getTasks());
+    assertEquals(status1.getMaxTasks(), status2.getMaxTasks());
+  }
+}

Added: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestSerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestSerializePrinting.java?rev=1021596&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestSerializePrinting.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestSerializePrinting.java Tue Oct 12 02:39:13 2010
@@ -0,0 +1,122 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Serialize Printing of Hello World
+ */
+public class TestSerializePrinting extends HamaCluster implements Watcher {
+  private Log LOG = LogFactory.getLog(TestSerializePrinting.class);
+  private int NUM_PEER = 10;
+  List<BSPPeerThread> list = new ArrayList<BSPPeerThread>(NUM_PEER);
+  List<String> echo = new ArrayList<String>();
+  Configuration conf;
+
+  public TestSerializePrinting() {
+    this.conf = getConf();
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+
+    ZooKeeper zk = new ZooKeeper("localhost:21810", 3000, this);
+    Stat s = null;
+    if (zk != null) {
+      try {
+        s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false);
+      } catch (Exception e) {
+        LOG.error(s);
+      }
+
+      if (s == null) {
+        try {
+          zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
+              Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        } catch (KeeperException e) {
+          LOG.error(e);
+        } catch (InterruptedException e) {
+          LOG.error(e);
+        }
+      }
+    }
+  }
+
+  public void testHelloWorld() throws InterruptedException, IOException {
+    BSPPeerThread thread;
+    int[] randomSequence = new int[] { 2, 3, 4, 5, 0, 1, 6, 7, 8, 9 };
+    for (int i = 0; i < NUM_PEER; i++) {
+      conf.setInt("bsp.peers.num", NUM_PEER);
+      conf.set(Constants.PEER_HOST, "localhost");
+      conf.set(Constants.PEER_PORT, String
+          .valueOf(30000 + randomSequence[i]));
+      conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
+      thread = new BSPPeerThread(conf, randomSequence[i]);
+      System.out.println(randomSequence[i] + ", " + thread.getName());
+      list.add(thread);
+    }
+
+    for (int i = 0; i < NUM_PEER; i++) {
+      list.get(i).start();
+    }
+
+    for (int i = 0; i < NUM_PEER; i++) {
+      list.get(i).join();
+    }
+  }
+
+  public class BSPPeerThread extends Thread {
+    private BSPPeer peer;
+    private int myId;
+
+    public BSPPeerThread(Configuration conf, int myId) throws IOException {
+      conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+      
+      this.peer = new BSPPeer(conf);
+      this.myId = myId;
+    }
+
+    @Override
+    public void run() {
+      for (int i = 0; i < NUM_PEER; i++) {
+        if (myId == i) {
+          echo.add(getName());
+          System.out.println("Hello BSP from " + i + " of " + NUM_PEER + ": "
+              + getName());
+        }
+
+        try {
+          Thread.sleep(2000);
+          peer.sync();
+        } catch (IOException e) {
+          e.printStackTrace();
+        } catch (KeeperException e) {
+          e.printStackTrace();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+
+      }
+    }
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+    // TODO Auto-generated method stub
+
+  }
+}