You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/10/12 04:39:14 UTC
svn commit: r1021596 - in /incubator/hama/trunk: ./
src/java/org/apache/hama/ src/java/org/apache/hama/bsp/
src/test/org/apache/hama/ src/test/org/apache/hama/bsp/
Author: edwardyoon
Date: Tue Oct 12 02:39:13 2010
New Revision: 1021596
URL: http://svn.apache.org/viewvc?rev=1021596&view=rev
Log:
Fix unit tests bug
Added:
incubator/hama/trunk/src/java/org/apache/hama/MiniZooKeeperCluster.java
incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java
incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java
incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java
- copied, changed from r1021565, incubator/hama/trunk/src/test/org/apache/hama/bsp/MiniBSPCluster.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/TestClusterStatus.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/TestSerializePrinting.java
Removed:
incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPTestDriver.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/MiniBSPCluster.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java
incubator/hama/trunk/src/test/org/apache/hama/HamaCluster.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1021596&r1=1021595&r2=1021596&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Oct 12 02:39:13 2010
@@ -168,6 +168,7 @@ Trunk (unreleased changes)
BUG FIXES
+ HAMA-301: Fix unit test fail bug (edwardyoon)
HAMA-303: slave.host.name is superceded by bsp.peer.hostname
(Filipe Manana via edwardyoon)
HAMA-296: Fix warning about deprecated method (edwardyoon)
Added: incubator/hama/trunk/src/java/org/apache/hama/MiniZooKeeperCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/MiniZooKeeperCluster.java?rev=1021596&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/MiniZooKeeperCluster.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/MiniZooKeeperCluster.java Tue Oct 12 02:39:13 2010
@@ -0,0 +1,196 @@
+package org.apache.hama;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+
+public class MiniZooKeeperCluster {
+ private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
+
+ private static final int TICK_TIME = 2000;
+ private static final int CONNECTION_TIMEOUT = 30000;
+
+ private boolean started;
+ private int clientPort = 21810; // use non-standard port
+
+ private NIOServerCnxn.Factory standaloneServerFactory;
+ private int tickTime = 0;
+
+ /** Create mini Zookeeper cluster. */
+ public MiniZooKeeperCluster() {
+ this.started = false;
+ }
+
+ public void setClientPort(int clientPort) {
+ this.clientPort = clientPort;
+ }
+
+ public void setTickTime(int tickTime) {
+ this.tickTime = tickTime;
+ }
+
+ private static void setupTestEnv() {
+ // during the tests we run with 100K prealloc in the logs.
+ // on windows systems prealloc of 64M was seen to take ~15seconds
+ // resulting in test failure (client timeout on first session).
+ // set env and directly in order to handle static init/gc issues
+ System.setProperty("zookeeper.preAllocSize", "100");
+ FileTxnLog.setPreallocSize(100);
+ }
+
+ /**
+ * @param baseDir
+ * @return ClientPort server bound to.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public int startup(File baseDir) throws IOException,
+ InterruptedException {
+
+ setupTestEnv();
+
+ shutdown();
+
+ File dir = new File(baseDir, "zookeeper").getAbsoluteFile();
+ recreateDir(dir);
+
+ int tickTimeToUse;
+ if (this.tickTime > 0) {
+ tickTimeToUse = this.tickTime;
+ } else {
+ tickTimeToUse = TICK_TIME;
+ }
+ ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
+ while (true) {
+ try {
+ standaloneServerFactory =
+ new NIOServerCnxn.Factory(new InetSocketAddress(clientPort));
+ } catch (BindException e) {
+ LOG.info("Faild binding ZK Server to client port: " + clientPort);
+ //this port is already in use. try to use another
+ clientPort++;
+ continue;
+ }
+ break;
+ }
+ standaloneServerFactory.startup(server);
+
+ if (!waitForServerUp(clientPort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for startup of standalone server");
+ }
+
+ started = true;
+
+ return clientPort;
+ }
+
+ private void recreateDir(File dir) throws IOException {
+ if (dir.exists()) {
+ FileUtil.fullyDelete(dir);
+ }
+ try {
+ dir.mkdirs();
+ } catch (SecurityException e) {
+ throw new IOException("creating dir: " + dir, e);
+ }
+ }
+
+ /**
+ * @throws IOException
+ */
+ public void shutdown() throws IOException {
+ if (!started) {
+ return;
+ }
+
+ standaloneServerFactory.shutdown();
+ if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for shutdown of standalone server");
+ }
+
+ started = false;
+ }
+
+ // XXX: From o.a.zk.t.ClientBase
+ private static boolean waitForServerDown(int port, long timeout) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ Socket sock = new Socket("localhost", port);
+ try {
+ OutputStream outstream = sock.getOutputStream();
+ outstream.write("stat".getBytes());
+ outstream.flush();
+ } finally {
+ sock.close();
+ }
+ } catch (IOException e) {
+ return true;
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+
+ // XXX: From o.a.zk.t.ClientBase
+ private static boolean waitForServerUp(int port, long timeout) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ Socket sock = new Socket("localhost", port);
+ BufferedReader reader = null;
+ try {
+ OutputStream outstream = sock.getOutputStream();
+ outstream.write("stat".getBytes());
+ outstream.flush();
+
+ Reader isr = new InputStreamReader(sock.getInputStream());
+ reader = new BufferedReader(isr);
+ String line = reader.readLine();
+ if (line != null && line.startsWith("Zookeeper version:")) {
+ return true;
+ }
+ } finally {
+ sock.close();
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ } catch (IOException e) {
+ // ignore as this is expected
+ LOG.info("server localhost:" + port + " not up " + e);
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java?rev=1021596&r1=1021595&r2=1021596&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java Tue Oct 12 02:39:13 2010
@@ -65,16 +65,16 @@ public class ClusterStatus implements Wr
/**
*
*/
- ClusterStatus() {}
+ public ClusterStatus() {}
- ClusterStatus(int grooms, int tasks, int maxTasks, BSPMaster.State state) {
+ public ClusterStatus(int grooms, int tasks, int maxTasks, BSPMaster.State state) {
this.numActiveGrooms = grooms;
this.tasks = tasks;
this.maxTasks = maxTasks;
this.state = state;
}
- ClusterStatus(Collection<String> activeGrooms, int tasks, int maxTasks,
+ public ClusterStatus(Collection<String> activeGrooms, int tasks, int maxTasks,
BSPMaster.State state) {
this(activeGrooms.size(), tasks, maxTasks, state);
this.activeGrooms = activeGrooms;
Modified: incubator/hama/trunk/src/test/org/apache/hama/HamaCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/HamaCluster.java?rev=1021596&r1=1021595&r2=1021596&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/HamaCluster.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/HamaCluster.java Tue Oct 12 02:39:13 2010
@@ -19,35 +19,21 @@
*/
package org.apache.hama;
-import junit.framework.TestCase;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hama.bsp.LocalBSPCluster;
/**
- * Forming up the miniDfs and miniHbase
+ * Forming up the miniDfs and miniZooKeeper
*/
-public abstract class HamaCluster extends TestCase {
+public abstract class HamaCluster extends HamaClusterTestCase {
public static final Log LOG = LogFactory.getLog(HamaCluster.class);
protected final static HamaConfiguration conf = new HamaConfiguration();
protected void setUp() throws Exception {
super.setUp();
-
- String[] args = new String[0];
- StringUtils.startupShutdownMessage(LocalBSPCluster.class, args, LOG);
- HamaConfiguration conf = new HamaConfiguration();
- //LocalBSPCluster cluster = new LocalBSPCluster(conf);
- //cluster.startup();
}
protected static HamaConfiguration getConf() {
return conf;
}
-
- protected void setMiniBSPCluster() {
- // TODO Auto-generated method stub
- }
}
Added: incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java?rev=1021596&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java Tue Oct 12 02:39:13 2010
@@ -0,0 +1,150 @@
+package org.apache.hama;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public abstract class HamaClusterTestCase extends HamaTestCase {
+ public static final Log LOG = LogFactory.getLog(HamaClusterTestCase.class);
+ public MiniBSPCluster cluster;
+ protected MiniDFSCluster dfsCluster;
+ protected MiniZooKeeperCluster zooKeeperCluster;
+ protected int groomServers;
+ protected boolean startDfs;
+
+ /** default constructor */
+ public HamaClusterTestCase() {
+ this(1);
+ }
+
+ public HamaClusterTestCase(int groomServers) {
+ this(groomServers, true);
+ }
+
+ public HamaClusterTestCase(int groomServers, boolean startDfs) {
+ super();
+ this.startDfs = startDfs;
+ this.groomServers = groomServers;
+ }
+
+ /**
+ * Actually start the MiniBSP instance.
+ */
+ protected void hamaClusterSetup() throws Exception {
+ File testDir = new File(getUnitTestdir(getName()).toString());
+
+ // Note that this is done before we create the MiniHBaseCluster because we
+ // need to edit the config to add the ZooKeeper servers.
+ this.zooKeeperCluster = new MiniZooKeeperCluster();
+ int clientPort = this.zooKeeperCluster.startup(testDir);
+ conf.set("hbase.zookeeper.property.clientPort", Integer.toString(clientPort));
+
+ // start the mini cluster
+ this.cluster = new MiniBSPCluster(conf, groomServers);
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ try {
+ if (this.startDfs) {
+ // This spews a bunch of warnings about missing scheme. TODO: fix.
+ this.dfsCluster = new MiniDFSCluster(0, this.conf, 2, true, true, true,
+ null, null, null, null);
+
+ // mangle the conf so that the fs parameter points to the minidfs we
+ // just started up
+ FileSystem filesystem = dfsCluster.getFileSystem();
+ conf.set("fs.defaultFS", filesystem.getUri().toString());
+ Path parentdir = filesystem.getHomeDirectory();
+
+ filesystem.mkdirs(parentdir);
+ }
+
+ // do the super setup now. if we had done it first, then we would have
+ // gotten our conf all mangled and a local fs started up.
+ super.setUp();
+
+ // start the instance
+ hamaClusterSetup();
+
+ } catch (Exception e) {
+ LOG.error("Exception in setup!", e);
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ if (zooKeeperCluster != null) {
+ zooKeeperCluster.shutdown();
+ }
+ if (dfsCluster != null) {
+ shutdownDfs(dfsCluster);
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ try {
+ if (this.cluster != null) {
+ try {
+ this.cluster.shutdown();
+ } catch (Exception e) {
+ LOG.warn("Closing mini dfs", e);
+ }
+ try {
+ this.zooKeeperCluster.shutdown();
+ } catch (IOException e) {
+ LOG.warn("Shutting down ZooKeeper cluster", e);
+ }
+ }
+ if (startDfs) {
+ shutdownDfs(dfsCluster);
+ }
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ }
+
+
+ /**
+ * Use this utility method debugging why cluster won't go down. On a
+ * period it throws a thread dump. Method ends when all cluster
+ * regionservers and master threads are no long alive.
+ */
+ public void threadDumpingJoin() {
+ if (this.cluster.getGroomServerThreads() != null) {
+ for(Thread t: this.cluster.getGroomServerThreads()) {
+ threadDumpingJoin(t);
+ }
+ }
+ threadDumpingJoin(this.cluster.getMaster());
+ }
+
+ protected void threadDumpingJoin(final Thread t) {
+ if (t == null) {
+ return;
+ }
+ long startTime = System.currentTimeMillis();
+ while (t.isAlive()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.info("Continuing...", e);
+ }
+ if (System.currentTimeMillis() - startTime > 60000) {
+ startTime = System.currentTimeMillis();
+ ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
+ "Automatic Stack Trace every 60 seconds waiting on " +
+ t.getName());
+ }
+ }
+ }
+}
Added: incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java?rev=1021596&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java Tue Oct 12 02:39:13 2010
@@ -0,0 +1,159 @@
+package org.apache.hama;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.AssertionFailedError;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hama.util.Bytes;
+
+public abstract class HamaTestCase extends TestCase {
+ private static Log LOG = LogFactory.getLog(HamaTestCase.class);
+
+ /** configuration parameter name for test directory */
+ public static final String TEST_DIRECTORY_KEY = "test.build.data";
+
+ private boolean localfs = false;
+ protected Path testDir = null;
+ protected FileSystem fs = null;
+
+ static {
+ initialize();
+ }
+
+ public volatile HamaConfiguration conf;
+
+ /** constructor */
+ public HamaTestCase() {
+ super();
+ init();
+ }
+
+ /**
+ * @param name
+ */
+ public HamaTestCase(String name) {
+ super(name);
+ init();
+ }
+
+ private void init() {
+ conf = new HamaConfiguration();
+ }
+
+ /**
+ * Note that this method must be called after the mini hdfs cluster has
+ * started or we end up with a local file system.
+ */
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ localfs =
+ (conf.get("fs.defaultFS", "file:///").compareTo("file:///") == 0);
+
+ if (fs == null) {
+ this.fs = FileSystem.get(conf);
+ }
+ try {
+ if (localfs) {
+ this.testDir = getUnitTestdir(getName());
+ if (fs.exists(testDir)) {
+ fs.delete(testDir, true);
+ }
+ } else {
+ this.testDir =
+ this.fs.makeQualified(new Path("/tmp/hama-test"));
+ }
+ } catch (Exception e) {
+ LOG.fatal("error during setup", e);
+ throw e;
+ }
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ try {
+ if (localfs) {
+ if (this.fs.exists(testDir)) {
+ this.fs.delete(testDir, true);
+ }
+ }
+ } catch (Exception e) {
+ LOG.fatal("error during tear down", e);
+ }
+ super.tearDown();
+ }
+
+ protected Path getUnitTestdir(String testName) {
+ return new Path(
+ conf.get(TEST_DIRECTORY_KEY, "test/build/data"), testName);
+ }
+
+ /**
+ * Initializes parameters used in the test environment:
+ *
+ * Sets the configuration parameter TEST_DIRECTORY_KEY if not already set.
+ * Sets the boolean debugging if "DEBUGGING" is set in the environment.
+ * If debugging is enabled, reconfigures logging so that the root log level is
+ * set to WARN and the logging level for the package is set to DEBUG.
+ */
+ public static void initialize() {
+ if (System.getProperty(TEST_DIRECTORY_KEY) == null) {
+ System.setProperty(TEST_DIRECTORY_KEY, new File(
+ "build/hama/test").getAbsolutePath());
+ }
+ }
+
+ /**
+ * Common method to close down a MiniDFSCluster and the associated file system
+ *
+ * @param cluster
+ */
+ public static void shutdownDfs(MiniDFSCluster cluster) {
+ if (cluster != null) {
+ LOG.info("Shutting down Mini DFS ");
+ try {
+ cluster.shutdown();
+ } catch (Exception e) {
+ /// Can get a java.lang.reflect.UndeclaredThrowableException thrown
+ // here because of an InterruptedException. Don't let exceptions in
+ // here be cause of test failure.
+ }
+ try {
+ FileSystem fs = cluster.getFileSystem();
+ if (fs != null) {
+ LOG.info("Shutting down FileSystem");
+ fs.close();
+ }
+ FileSystem.closeAll();
+ } catch (IOException e) {
+ LOG.error("error closing file system", e);
+ }
+ }
+ }
+
+ public void assertByteEquals(byte[] expected,
+ byte[] actual) {
+ if (Bytes.compareTo(expected, actual) != 0) {
+ throw new AssertionFailedError("expected:<" +
+ Bytes.toString(expected) + "> but was:<" +
+ Bytes.toString(actual) + ">");
+ }
+ }
+
+ public static void assertEquals(byte[] expected,
+ byte[] actual) {
+ if (Bytes.compareTo(expected, actual) != 0) {
+ throw new AssertionFailedError("expected:<" +
+ Bytes.toStringBinary(expected) + "> but was:<" +
+ Bytes.toStringBinary(actual) + ">");
+ }
+ }
+
+}
Copied: incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java (from r1021565, incubator/hama/trunk/src/test/org/apache/hama/bsp/MiniBSPCluster.java)
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java?p2=incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java&p1=incubator/hama/trunk/src/test/org/apache/hama/bsp/MiniBSPCluster.java&r1=1021565&r2=1021596&rev=1021596&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/MiniBSPCluster.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java Tue Oct 12 02:39:13 2010
@@ -1,5 +1,28 @@
-package org.apache.hama.bsp;
+package org.apache.hama;
+
+import java.util.List;
+
+import org.apache.hama.HamaConfiguration;
public class MiniBSPCluster {
+ public MiniBSPCluster(HamaConfiguration conf, int groomServers) {
+ // TODO Auto-generated constructor stub
+ }
+
+ public void shutdown() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public List<Thread> getGroomServerThreads() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public Thread getMaster() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
Added: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java?rev=1021596&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java Tue Oct 12 02:39:13 2010
@@ -0,0 +1,179 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.util.Bytes;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+public class TestBSPPeer extends HamaCluster implements Watcher {
+ private Log LOG = LogFactory.getLog(TestBSPPeer.class);
+
+ private static final int NUM_PEER = 35;
+ private static final int ROUND = 3;
+ private static final int PAYLOAD = 1024; // 1kb in default
+ List<BSPPeerThread> list = new ArrayList<BSPPeerThread>(NUM_PEER);
+ Configuration conf;
+ private Random r = new Random();
+
+ public TestBSPPeer() {
+ this.conf = getConf();
+ }
+
+ public void setUp() throws Exception {
+ super.setUp();
+
+ ZooKeeper zk = new ZooKeeper("localhost:21810", 3000, this);
+ Stat s = null;
+ if (zk != null) {
+ try {
+ s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false);
+ } catch (Exception e) {
+ LOG.error(s);
+ }
+
+ if (s == null) {
+ try {
+ zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (KeeperException e) {
+ LOG.error(e);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+ }
+ }
+
+ public class BSPPeerThread extends Thread {
+ private BSPPeer peer;
+ private int MAXIMUM_DURATION = 5;
+
+ public BSPPeerThread(Configuration conf) throws IOException {
+ this.peer = new BSPPeer(conf);
+ }
+
+ @Override
+ public void run() {
+ int randomTime;
+ byte[] dummyData = new byte[PAYLOAD];
+ BSPMessage msg = null;
+ InetSocketAddress addr = null;
+
+ for (int i = 0; i < ROUND; i++) {
+ randomTime = r.nextInt(MAXIMUM_DURATION) + 5;
+
+ for (int j = 0; j < 10; j++) {
+ r.nextBytes(dummyData);
+ msg = new BSPMessage(Bytes.tail(dummyData, 128), dummyData);
+ addr = new InetSocketAddress("localhost", 30000 + j);
+ try {
+ peer.send(addr, msg);
+ } catch (IOException e) {
+ LOG.info(e);
+ }
+ }
+
+ try {
+ Thread.sleep(randomTime * 1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ try {
+ peer.sync();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ verifyPayload();
+ }
+ }
+
+ private void verifyPayload() {
+ System.out.println("[" + getName() + "] verifying "
+ + peer.localQueue.size() + " messages");
+ BSPMessage msg = null;
+
+ try {
+ while ((msg = peer.getCurrentMessage()) != null) {
+ assertEquals(Bytes.compareTo(msg.tag, 0, 128, msg.data,
+ msg.data.length - 128, 128), 0);
+ }
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+
+ peer.localQueue.clear();
+ }
+
+ public BSPPeer getBSPPeer() {
+ return this.peer;
+ }
+ }
+
+ public void testSync() throws InterruptedException, IOException {
+
+ BSPPeerThread thread;
+ conf.setInt("bsp.peers.num", NUM_PEER);
+ conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+ conf.set(Constants.PEER_HOST, "localhost");
+ conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
+
+ for (int i = 0; i < NUM_PEER; i++) {
+ conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
+ thread = new BSPPeerThread(conf);
+ list.add(thread);
+ }
+
+ for (int i = 0; i < NUM_PEER; i++) {
+ list.get(i).start();
+ }
+
+ for (int i = 0; i < NUM_PEER; i++) {
+ list.get(i).join();
+ }
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ }
+}
Added: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestClusterStatus.java?rev=1021596&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestClusterStatus.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestClusterStatus.java Tue Oct 12 02:39:13 2010
@@ -0,0 +1,58 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hama.bsp.BSPMaster;
+import org.apache.hama.bsp.ClusterStatus;
+
+public class TestClusterStatus extends TestCase {
+ Random rnd = new Random();
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ }
+
+ public final void testWriteAndReadFields() throws IOException {
+ DataOutputBuffer out = new DataOutputBuffer();
+ DataInputBuffer in = new DataInputBuffer();
+
+ ClusterStatus status1;
+ List<String> grooms = new ArrayList<String>();
+
+ for(int i=0;i< 10;i++) {
+ grooms.add("groom_"+rnd.nextInt());
+ }
+
+ int tasks = rnd.nextInt(100);
+ int maxTasks = rnd.nextInt(100);
+ BSPMaster.State state = BSPMaster.State.RUNNING;
+
+ status1 = new ClusterStatus(grooms, tasks, maxTasks, state);
+ status1.write(out);
+
+ in.reset(out.getData(), out.getLength());
+
+ ClusterStatus status2 = new ClusterStatus();
+ status2.readFields(in);
+
+ Set<String> grooms_s = new HashSet<String>(status1.getActiveGroomNames());
+ Set<String> grooms_o = new HashSet<String>(status2.getActiveGroomNames());
+
+ assertEquals(status1.getGroomServers(), status2.getGroomServers());
+
+ assertTrue(grooms_s.containsAll(grooms_o));
+ assertTrue(grooms_o.containsAll(grooms_s));
+
+ assertEquals(status1.getTasks(),status2.getTasks());
+ assertEquals(status1.getMaxTasks(), status2.getMaxTasks());
+ }
+}
Added: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestSerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestSerializePrinting.java?rev=1021596&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestSerializePrinting.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestSerializePrinting.java Tue Oct 12 02:39:13 2010
@@ -0,0 +1,122 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Serialize Printing of Hello World
+ */
+public class TestSerializePrinting extends HamaCluster implements Watcher {
+ private Log LOG = LogFactory.getLog(TestSerializePrinting.class);
+ private int NUM_PEER = 10;
+ List<BSPPeerThread> list = new ArrayList<BSPPeerThread>(NUM_PEER);
+ List<String> echo = new ArrayList<String>();
+ Configuration conf;
+
+ public TestSerializePrinting() {
+ this.conf = getConf();
+ }
+
+ public void setUp() throws Exception {
+ super.setUp();
+
+ ZooKeeper zk = new ZooKeeper("localhost:21810", 3000, this);
+ Stat s = null;
+ if (zk != null) {
+ try {
+ s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false);
+ } catch (Exception e) {
+ LOG.error(s);
+ }
+
+ if (s == null) {
+ try {
+ zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (KeeperException e) {
+ LOG.error(e);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+ }
+ }
+
+ public void testHelloWorld() throws InterruptedException, IOException {
+ BSPPeerThread thread;
+ int[] randomSequence = new int[] { 2, 3, 4, 5, 0, 1, 6, 7, 8, 9 };
+ for (int i = 0; i < NUM_PEER; i++) {
+ conf.setInt("bsp.peers.num", NUM_PEER);
+ conf.set(Constants.PEER_HOST, "localhost");
+ conf.set(Constants.PEER_PORT, String
+ .valueOf(30000 + randomSequence[i]));
+ conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
+ thread = new BSPPeerThread(conf, randomSequence[i]);
+ System.out.println(randomSequence[i] + ", " + thread.getName());
+ list.add(thread);
+ }
+
+ for (int i = 0; i < NUM_PEER; i++) {
+ list.get(i).start();
+ }
+
+ for (int i = 0; i < NUM_PEER; i++) {
+ list.get(i).join();
+ }
+ }
+
+ public class BSPPeerThread extends Thread {
+ private BSPPeer peer;
+ private int myId;
+
+ public BSPPeerThread(Configuration conf, int myId) throws IOException {
+ conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+
+ this.peer = new BSPPeer(conf);
+ this.myId = myId;
+ }
+
+ @Override
+ public void run() {
+ for (int i = 0; i < NUM_PEER; i++) {
+ if (myId == i) {
+ echo.add(getName());
+ System.out.println("Hello BSP from " + i + " of " + NUM_PEER + ": "
+ + getName());
+ }
+
+ try {
+ Thread.sleep(2000);
+ peer.sync();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ }
+ }
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ // TODO Auto-generated method stub
+
+ }
+}