You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2011/09/27 11:35:48 UTC
svn commit: r1176297 [11/19] - in /incubator/hama/branches/HamaV2: ./ api/
api/target/ api/target/classes/ api/target/classes/META-INF/
api/target/lib/ api/target/maven-archiver/
api/target/maven-shared-archive-resources/ api/target/maven-shared-archiv...
Added: incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/bspmaster.jsp
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/bspmaster.jsp?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/bspmaster.jsp (added)
+++ incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/bspmaster.jsp Tue Sep 27 09:35:21 2011
@@ -0,0 +1,82 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<%@ page contentType="text/html; charset=UTF-8" import="javax.servlet.*"
+ import="javax.servlet.http.*" import="java.io.*" import="java.util.*"
+ import="java.text.DecimalFormat" import="org.apache.hama.bsp.*"
+ import="org.apache.hama.util.*"%>
+<%!private static final long serialVersionUID = 1L;%>
+<%
+ BSPMaster tracker = (BSPMaster) application
+ .getAttribute("bsp.master");
+ ClusterStatus status = tracker.getClusterStatus(true);
+ String trackerName = tracker.getBSPMasterName();
+ JobStatus[] runningJobs = tracker.jobsToComplete();
+ JobStatus[] allJobs = tracker.getAllJobs();
+%>
+<%!private static DecimalFormat percentFormat = new DecimalFormat("##0.00");
+
+ public void generateSummaryTable(JspWriter out, ClusterStatus status,
+ BSPMaster tracker) throws IOException {
+ String tasksPerNode = status.getGroomServers() > 0 ? percentFormat
+ .format(((double) (status.getMaxTasks()) / status
+ .getGroomServers())) : "-";
+ out.print("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n"
+ + "<tr>" + "<th>Groom Servers</th><th>BSP Task Capacity</th>"
+ + "<th>Avg. Tasks/Node</th>"
+ + "<th>Blacklisted Nodes</th></tr>\n");
+ out.print("<tr><td><a href=\"machines.jsp?type=active\">"
+ + status.getActiveGroomNames().size() + "</a></td><td>"
+ + status.getMaxTasks() + "</td><td>" + tasksPerNode
+ + "</td><td><a href=\"machines.jsp?type=blacklisted\">" + 0
+ + "</a>" + "</td></tr></table>\n");
+
+ out.print("<br>");
+ }%>
+
+
+<html>
+<head>
+<title><%=trackerName%> Hama BSP Administration</title>
+<!-- <link rel="stylesheet" type="text/css" href="/static/hadoop.css">-->
+</head>
+<body>
+
+<h1><%=trackerName%> Hama BSP Administration</h1>
+
+<b>State:</b>
+<%=status.getBSPMasterState()%><br>
+<b>Started:</b>
+<%=new Date(tracker.getStartTime())%><br>
+<b>Identifier:</b>
+<%=tracker.getBSPMasterIdentifier()%><br>
+
+<hr>
+<%
+ generateSummaryTable(out, status, tracker);
+%>
+<hr />
+
+<h2 id="running_jobs">Running Jobs</h2>
+<%=BSPServletUtil.generateJobTable("Running", runningJobs,
+ 30, 0)%>
+<hr>
+<h2 id="running_jobs">All Jobs History</h2>
+<%=BSPServletUtil.generateJobTable("All", allJobs,
+ 30, 0)%>
+<%
+ out.println(BSPServletUtil.htmlFooter());
+%>
\ No newline at end of file
Propchange: incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/bspmaster.jsp
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/index.html
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/index.html?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/index.html (added)
+++ incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/index.html Tue Sep 27 09:35:21 2011
@@ -0,0 +1,36 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<meta HTTP-EQUIV="REFRESH" content="0;url=bspmaster.jsp"/>
+<html>
+
+<head>
+<title>Hama Administration</title>
+</head>
+
+<body>
+
+<h1>Hama Administration</h1>
+
+<ul>
+
+<li><a href="bspmaster.jsp">BSPMaster</a></li>
+
+</ul>
+
+</body>
+
+</html>
Propchange: incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/index.html
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/machines.jsp
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/machines.jsp?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/machines.jsp (added)
+++ incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/machines.jsp Tue Sep 27 09:35:21 2011
@@ -0,0 +1,41 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<%@ page contentType="text/html; charset=UTF-8" import="javax.servlet.*"
+ import="javax.servlet.http.*" import="java.io.*" import="java.util.*"
+ import="java.text.DecimalFormat" import="org.apache.hama.bsp.*"
+ import="org.apache.hama.util.*"%>
+<%!private static final long serialVersionUID = 1L;%>
+<%
+ BSPMaster tracker = (BSPMaster) application
+ .getAttribute("bsp.master");
+ ClusterStatus status = tracker.getClusterStatus(true);
+ String trackerName = tracker.getBSPMasterName();
+ String type = request.getParameter("type");
+%>
+
+<html>
+
+<title><%=trackerName%> Hama Machine List</title>
+
+<body>
+<h1><a href="bspmaster.jsp"><%=trackerName%></a> Hama Machine List</h1>
+
+<h2>Grooms</h2>
+<%
+ out.println(BSPServletUtil.generateGroomsTable(type, status, tracker));
+ out.println(BSPServletUtil.htmlFooter());
+%>
\ No newline at end of file
Propchange: incubator/hama/branches/HamaV2/core/src/main/webapp/bspmaster/machines.jsp
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaCluster.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaCluster.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaCluster.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,47 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Forming up the miniDfs and miniZooKeeper
+ */
+public abstract class HamaCluster extends HamaClusterTestCase {
+ public static final Log LOG = LogFactory.getLog(HamaCluster.class);
+ protected final static HamaConfiguration conf = new HamaConfiguration();
+
+ public HamaCluster(){
+ super();
+ }
+
+ public HamaCluster(boolean startDfs) {
+ super(startDfs);
+ }
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ }
+
+ protected static HamaConfiguration getConf() {
+ return conf;
+ }
+}
Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaCluster.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaClusterTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaClusterTestCase.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaClusterTestCase.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaClusterTestCase.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.File;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+public abstract class HamaClusterTestCase extends HamaTestCase {
+ public static final Log LOG = LogFactory.getLog(HamaClusterTestCase.class);
+ protected MiniDFSCluster dfsCluster;
+ protected MiniBSPCluster bspCluster;
+ protected MiniZooKeeperCluster zooKeeperCluster;
+ protected boolean startDfs;
+ protected int numOfGroom = 1;
+
+ /** default constructor */
+ public HamaClusterTestCase() {
+ this(false);
+ }
+
+ public HamaClusterTestCase(boolean startDfs) {
+ super();
+ this.startDfs = startDfs;
+ }
+
+ /**
+ * Actually start the MiniBSP instance.
+ */
+ protected void hamaClusterSetup() throws Exception {
+ File testDir = new File(getUnitTestdir(getName()).toString());
+
+ // Note that this is done before we create the MiniHamaCluster because we
+ // need to edit the config to add the ZooKeeper servers.
+ this.zooKeeperCluster = new MiniZooKeeperCluster();
+ int clientPort = this.zooKeeperCluster.startup(testDir);
+ conf.set("hama.zookeeper.property.clientPort", Integer.toString(clientPort));
+ bspCluster = new MiniBSPCluster(this.conf, numOfGroom);
+ bspCluster.startBSPCluster();
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ try {
+ if (this.startDfs) {
+ // This spews a bunch of warnings about missing scheme. TODO: fix.
+ this.dfsCluster = new MiniDFSCluster(0, this.conf, 2, true, true, true,
+ null, null, null, null);
+
+ // mangle the conf so that the fs parameter points to the minidfs we
+ // just started up
+ FileSystem filesystem = dfsCluster.getFileSystem();
+ conf.set("fs.defaultFS", filesystem.getUri().toString());
+ Path parentdir = filesystem.getHomeDirectory();
+
+ filesystem.mkdirs(parentdir);
+ }
+
+ // do the super setup now. if we had done it first, then we would have
+ // gotten our conf all mangled and a local fs started up.
+ super.setUp();
+
+ // start the instance
+ hamaClusterSetup();
+ } catch (Exception e) {
+ if (zooKeeperCluster != null) {
+ zooKeeperCluster.shutdown();
+ }
+ if (dfsCluster != null) {
+ shutdownDfs(dfsCluster);
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ try {
+ if (startDfs) {
+ shutdownDfs(dfsCluster);
+ }
+ bspCluster.shutdown();
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ }
+}
Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaClusterTestCase.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaTestCase.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaTestCase.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaTestCase.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.AssertionFailedError;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hama.util.Bytes;
+
+public abstract class HamaTestCase extends TestCase {
+ private static Log LOG = LogFactory.getLog(HamaTestCase.class);
+
+ /** configuration parameter name for test directory */
+ public static final String TEST_DIRECTORY_KEY = "test.build.data";
+
+ private boolean localfs = false;
+ protected Path testDir = null;
+ protected FileSystem fs = null;
+
+ static {
+ initialize();
+ }
+
+ public volatile HamaConfiguration conf;
+
+ /** constructor */
+ public HamaTestCase() {
+ super();
+ init();
+ }
+
+ /**
+ * @param name
+ */
+ public HamaTestCase(String name) {
+ super(name);
+ init();
+ }
+
+ private void init() {
+ conf = new HamaConfiguration();
+ conf.setStrings("bsp.local.dir", "/tmp/hama-test");
+ conf.set("bsp.master.address", "localhost");
+ conf.set("bsp.groom.report.address", "127.0.0.1:0");
+ }
+
+ /**
+ * Note that this method must be called after the mini hdfs cluster has
+ * started or we end up with a local file system.
+ */
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ localfs =
+ (conf.get("fs.defaultFS", "file:///").compareTo("file:///") == 0);
+
+ if (fs == null) {
+ this.fs = FileSystem.get(conf);
+ }
+ try {
+ if (localfs) {
+ this.testDir = getUnitTestdir(getName());
+ if (fs.exists(testDir)) {
+ fs.delete(testDir, true);
+ }
+ } else {
+ this.testDir =
+ this.fs.makeQualified(new Path("/tmp/hama-test"));
+ }
+ } catch (Exception e) {
+ LOG.fatal("error during setup", e);
+ throw e;
+ }
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ try {
+ if (localfs) {
+ if (this.fs.exists(testDir)) {
+ this.fs.delete(testDir, true);
+ }
+ }
+ } catch (Exception e) {
+ LOG.fatal("error during tear down", e);
+ }
+ super.tearDown();
+ }
+
+ protected Path getUnitTestdir(String testName) {
+ return new Path(
+ conf.get(TEST_DIRECTORY_KEY, "/tmp/hama-test/build/data"), testName);
+ }
+
+ /**
+ * Initializes parameters used in the test environment:
+ *
+ * Sets the configuration parameter TEST_DIRECTORY_KEY if not already set.
+ * Sets the boolean debugging if "DEBUGGING" is set in the environment.
+ * If debugging is enabled, reconfigures logging so that the root log level is
+ * set to WARN and the logging level for the package is set to DEBUG.
+ */
+ public static void initialize() {
+ if (System.getProperty(TEST_DIRECTORY_KEY) == null) {
+ System.setProperty(TEST_DIRECTORY_KEY, new File(
+ "build/hama/test").getAbsolutePath());
+ }
+ }
+
+ /**
+ * Common method to close down a MiniDFSCluster and the associated file system
+ *
+ * @param cluster
+ */
+ public static void shutdownDfs(MiniDFSCluster cluster) {
+ if (cluster != null) {
+ LOG.info("Shutting down Mini DFS ");
+ try {
+ cluster.shutdown();
+ } catch (Exception e) {
+ /// Can get a java.lang.reflect.UndeclaredThrowableException thrown
+ // here because of an InterruptedException. Don't let exceptions in
+ // here be cause of test failure.
+ }
+ try {
+ FileSystem fs = cluster.getFileSystem();
+ if (fs != null) {
+ LOG.info("Shutting down FileSystem");
+ fs.close();
+ }
+ FileSystem.closeAll();
+ } catch (IOException e) {
+ LOG.error("error closing file system", e);
+ }
+ }
+ }
+
+ public void assertByteEquals(byte[] expected,
+ byte[] actual) {
+ if (Bytes.compareTo(expected, actual) != 0) {
+ throw new AssertionFailedError("expected:<" +
+ Bytes.toString(expected) + "> but was:<" +
+ Bytes.toString(actual) + ">");
+ }
+ }
+
+ public static void assertEquals(byte[] expected,
+ byte[] actual) {
+ if (Bytes.compareTo(expected, actual) != 0) {
+ throw new AssertionFailedError("expected:<" +
+ Bytes.toStringBinary(expected) + "> but was:<" +
+ Bytes.toStringBinary(actual) + ">");
+ }
+ }
+
+}
Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/HamaTestCase.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/MiniBSPCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/MiniBSPCluster.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/MiniBSPCluster.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/MiniBSPCluster.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import static java.util.concurrent.TimeUnit.*;
+
+import static junit.framework.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.bsp.BSPMaster;
+import org.apache.hama.bsp.GroomServer;
+import org.apache.hama.HamaConfiguration;
+
+
+public class MiniBSPCluster {
+
+ public static final Log LOG = LogFactory.getLog(MiniBSPCluster.class);
+
+ private ScheduledExecutorService scheduler;
+
+ private HamaConfiguration configuration;
+ private BSPMasterRunner master;
+ private List<GroomServerRunner> groomServerList =
+ new CopyOnWriteArrayList<GroomServerRunner>();
+ private int grooms;
+
+ public class BSPMasterRunner implements Runnable{
+ BSPMaster bspm;
+ HamaConfiguration conf;
+
+ public BSPMasterRunner(HamaConfiguration conf){
+ this.conf = conf;
+ if(null == this.conf)
+ throw new NullPointerException("No Configuration for BSPMaster.");
+ }
+
+ public void run(){
+ try{
+ LOG.info("Starting BSP Master.");
+ this.bspm = BSPMaster.startMaster(this.conf);
+ this.bspm.offerService();
+ }catch(IOException ioe){
+ LOG.error("Fail to startup BSP Master.", ioe);
+ }catch(InterruptedException ie){
+ LOG.error("BSP Master fails in offerService().", ie);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void shutdown(){
+ if(null != this.bspm) this.bspm.shutdown();
+ }
+
+ public boolean isRunning(){
+ if(null == this.bspm) return false;
+
+ if(this.bspm.currentState().equals(BSPMaster.State.RUNNING)){
+ return true;
+ }
+ return false;
+ }
+
+ public BSPMaster getMaster(){
+ return this.bspm;
+ }
+ }
+
+ public class GroomServerRunner implements Runnable{
+ GroomServer gs;
+ HamaConfiguration conf;
+
+ public GroomServerRunner(HamaConfiguration conf){
+ this.conf = conf;
+ }
+
+ public void run(){
+ try{
+ this.gs = GroomServer.constructGroomServer(GroomServer.class, conf);
+ GroomServer.startGroomServer(this.gs).join();
+ }catch(InterruptedException ie){
+ LOG.error("Fail to start GroomServer. ", ie);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void shutdown(){
+ try{
+ if(null != this.gs) this.gs.shutdown();
+ }catch(IOException ioe){
+ LOG.info("Fail to shutdown GroomServer.", ioe);
+ }
+ }
+
+ public boolean isRunning(){
+ if(null == this.gs) return false;
+ return this.gs.isRunning();
+ }
+
+ public GroomServer getGroomServer(){
+ return this.gs;
+ }
+ }
+
+ public MiniBSPCluster(HamaConfiguration conf, int groomServers) {
+ this.configuration = conf;
+ this.grooms = groomServers;
+ if(1 > this.grooms) {
+ this.grooms = 2;
+ }
+ LOG.info("Groom server number "+this.grooms);
+ int threadpool = conf.getInt("bsp.test.threadpool", 10);
+ LOG.info("Thread pool value "+threadpool);
+ scheduler = Executors.newScheduledThreadPool(threadpool);
+ }
+
+ public void startBSPCluster(){
+ startMaster();
+ startGroomServers();
+ }
+
+ public void shutdownBSPCluster(){
+ if(null != this.master && this.master.isRunning())
+ this.master.shutdown();
+ if(0 < groomServerList.size()){
+ for(GroomServerRunner groom: groomServerList){
+ if(groom.isRunning()) groom.shutdown();
+ }
+ }
+ }
+
+
+ public void startMaster(){
+ if(null == this.scheduler)
+ throw new NullPointerException("No ScheduledExecutorService exists.");
+ this.master = new BSPMasterRunner(this.configuration);
+ scheduler.schedule(this.master, 0, SECONDS);
+ }
+
+ public void startGroomServers(){
+ if(null == this.scheduler)
+ throw new NullPointerException("No ScheduledExecutorService exists.");
+ if(null == this.master)
+ throw new NullPointerException("No BSPMaster exists.");
+ int cnt=0;
+ while(!this.master.isRunning()){
+ LOG.info("Waiting BSPMaster up.");
+ try{
+ Thread.sleep(1000);
+ cnt++;
+ if(100 < cnt){
+ fail("Fail to launch BSPMaster.");
+ }
+ }catch(InterruptedException ie){
+ LOG.error("Fail to check BSP Master's state.", ie);
+ Thread.currentThread().interrupt();
+ }
+ }
+ for(int i=0; i < this.grooms; i++){
+ HamaConfiguration c = new HamaConfiguration(this.configuration);
+ randomPort(c);
+ GroomServerRunner gsr = new GroomServerRunner(c);
+ groomServerList.add(gsr);
+ scheduler.schedule(gsr, 0, SECONDS);
+ cnt = 0;
+ while(!gsr.isRunning()){
+ LOG.info("Waitin for GroomServer up.");
+ try{
+ Thread.sleep(1000);
+ cnt++;
+ if(10 < cnt){
+ fail("Fail to launch groom server.");
+ }
+ }catch(InterruptedException ie){
+ LOG.error("Fail to check Groom Server's state.", ie);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ }
+
+ private void randomPort(HamaConfiguration conf){
+ try{
+ ServerSocket skt = new ServerSocket(0);
+ int p = skt.getLocalPort();
+ skt.close();
+ conf.set(Constants.PEER_PORT, new Integer(p).toString());
+ conf.setInt(Constants.GROOM_RPC_PORT, p+100);
+ }catch(IOException ioe){
+ LOG.error("Can not find a free port for BSPPeer.", ioe);
+ }
+ }
+
+ public void shutdown() {
+ shutdownBSPCluster();
+ scheduler.shutdown();
+ }
+
+ public List<Thread> getGroomServerThreads() {
+ List<Thread> list = new ArrayList<Thread>();
+ for(GroomServerRunner gsr: groomServerList){
+ list.add(new Thread(gsr));
+ }
+ return list;
+ }
+
+ public Thread getMaster() {
+ return new Thread(this.master);
+ }
+
+ public List<GroomServer> getGroomServers(){
+ List<GroomServer> list = new ArrayList<GroomServer>();
+ for(GroomServerRunner gsr: groomServerList){
+ list.add(gsr.getGroomServer());
+ }
+ return list;
+ }
+
+ public BSPMaster getBSPMaster(){
+ if(null != this.master)
+ return this.master.getMaster();
+ return null;
+ }
+
+ public ScheduledExecutorService getScheduler(){
+ return this.scheduler;
+ }
+}
Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/MiniBSPCluster.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPPeer.BSPSerializableMessage;
+
+public final class BSPSerializerWrapper {
+
+ private final BSPPeer.BSPMessageSerializer serializer;
+
+ public BSPSerializerWrapper(Configuration conf, int port) throws IOException {
+ this.serializer = new BSPPeer(conf, null, null).new BSPMessageSerializer(
+ conf.getInt("bsp.checkpoint.port", port));
+ }
+
+ public final void serialize(final BSPSerializableMessage tmp)
+ throws IOException {
+ this.serializer.serialize(tmp);
+ }
+}
Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,95 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+
+public class TestBSPMasterGroomServer extends HamaCluster {
+
+ private static Log LOG = LogFactory.getLog(TestBSPMasterGroomServer.class);
+ private static String TMP_OUTPUT = "/tmp/test-example/";
+ private HamaConfiguration configuration;
+ private String TEST_JOB = "src/test/java/testjar/testjob.jar";
+
+ public TestBSPMasterGroomServer() {
+ configuration = new HamaConfiguration();
+ configuration.set("bsp.master.address", "localhost");
+ assertEquals("Make sure master addr is set to localhost:", "localhost",
+ configuration.get("bsp.master.address"));
+ configuration.setStrings("bsp.local.dir", "/tmp/hama-test");
+ System.setProperty("hama.log.dir", "/tmp/hama-test/logs");
+ configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+ configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
+ }
+
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ public void testSubmitJob() throws Exception {
+ BSPJob bsp = new BSPJob(configuration);
+ bsp.setJobName("Test Serialize Printing");
+ bsp.setBspClass(testjar.ClassSerializePrinting.HelloBSP.class);
+ bsp.setJar(System.getProperty("user.dir") + "/" + TEST_JOB);
+
+ // Set the task size as a number of GroomServer
+ BSPJobClient jobClient = new BSPJobClient(configuration);
+ configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
+ ClusterStatus cluster = jobClient.getClusterStatus(false);
+ assertEquals(this.numOfGroom, cluster.getGroomServers());
+ bsp.setNumBspTask(2);
+
+ FileSystem fileSys = FileSystem.get(conf);
+
+ if (bsp.waitForCompletion(true)) {
+ checkOutput(fileSys, cluster, conf);
+ }
+ LOG.info("Client finishes execution job.");
+ }
+
+ private static void checkOutput(FileSystem fileSys, ClusterStatus cluster,
+ HamaConfiguration conf) throws Exception {
+ for (int i = 0; i < 2; i++) {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
+ TMP_OUTPUT + i), conf);
+ LongWritable timestamp = new LongWritable();
+ Text message = new Text();
+ reader.next(timestamp, message);
+
+ LOG.info("output: " + message);
+ assertTrue("Check if `Hello BSP' gets printed.", message.toString()
+ .indexOf("Hello BSP from") >= 0);
+ reader.close();
+ }
+ }
+
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+}
Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,81 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import junit.framework.TestCase;
+
+public class TestBSPMessageBundle extends TestCase {
+
+ public void testEmpty() throws IOException {
+ BSPMessageBundle bundle = new BSPMessageBundle();
+ // Serialize it.
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ bundle.write(new DataOutputStream(baos));
+ baos.close();
+ // Deserialize it.
+ BSPMessageBundle readBundle = new BSPMessageBundle();
+ readBundle.readFields(new DataInputStream(new ByteArrayInputStream(baos
+ .toByteArray())));
+ assertEquals(0, readBundle.getMessages().size());
+ }
+
+ public void testSerializationDeserialization() throws IOException {
+ BSPMessageBundle bundle = new BSPMessageBundle();
+ ByteMessage[] testMessages = new ByteMessage[16];
+ for (int i = 0; i < testMessages.length; ++i) {
+ // Create a one byte tag containing the number of the message.
+ byte[] tag = new byte[1];
+ tag[0] = (byte) i;
+ // Create a four bytes data part containing serialized number of the
+ // message.
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ baos.write(i);
+ baos.close();
+ byte[] data = baos.toByteArray();
+ testMessages[i] = new ByteMessage(tag, data);
+ bundle.addMessage(testMessages[i]);
+ }
+ // Serialize it.
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ bundle.write(new DataOutputStream(baos));
+ baos.close();
+ // Deserialize it.
+ BSPMessageBundle readBundle = new BSPMessageBundle();
+ readBundle.readFields(new DataInputStream(new ByteArrayInputStream(baos
+ .toByteArray())));
+ // Check contents.
+ int messageNumber = 0;
+ for (BSPMessage message : readBundle.getMessages()) {
+ ByteMessage byteMessage = (ByteMessage) message;
+ assertTrue(Arrays.equals(testMessages[messageNumber].getTag(),
+ byteMessage.getTag()));
+ assertTrue(Arrays.equals(testMessages[messageNumber].getData(),
+ byteMessage.getData()));
+ ++messageNumber;
+ }
+ assertEquals(testMessages.length, messageNumber);
+ }
+}
Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+
+public class TestClusterStatus extends TestCase {
+ Random rnd = new Random();
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ }
+
+ public final void testWriteAndReadFields() throws IOException {
+ DataOutputBuffer out = new DataOutputBuffer();
+ DataInputBuffer in = new DataInputBuffer();
+
+ ClusterStatus status1;
+ Map<String, GroomServerStatus> grooms = new HashMap<String, GroomServerStatus>();
+
+ for (int i = 0; i < 10; i++) {
+ int num = rnd.nextInt();
+ String groomName = "groom_" + num;
+ String peerName = "peerhost:" + num;
+ grooms.put(groomName, new GroomServerStatus(peerName, new ArrayList<TaskStatus>(0), 25, 2));
+ }
+
+ int tasks = rnd.nextInt(100);
+ int maxTasks = rnd.nextInt(100);
+ BSPMaster.State state = BSPMaster.State.RUNNING;
+
+ status1 = new ClusterStatus(grooms, tasks, maxTasks, state);
+ status1.write(out);
+
+ in.reset(out.getData(), out.getLength());
+
+ ClusterStatus status2 = new ClusterStatus();
+ status2.readFields(in);
+
+ for(Entry<String, GroomServerStatus> entry : status2.getActiveGroomServerStatus().entrySet()){
+ assertEquals(entry.getValue().getMaxTasks(),2);
+ assertEquals(entry.getValue().getFailures(),25);
+ }
+
+ Map<String, String> grooms_s = new HashMap<String, String>(
+ status1.getActiveGroomNames());
+ Map<String, String> grooms_o = new HashMap<String, String>(
+ status2.getActiveGroomNames());
+
+ assertEquals(status1.getGroomServers(), status2.getGroomServers());
+
+ assertTrue(grooms_s.entrySet().containsAll(grooms_o.entrySet()));
+ assertTrue(grooms_o.entrySet().containsAll(grooms_s.entrySet()));
+
+ assertEquals(status1.getTasks(), status2.getTasks());
+ assertEquals(status1.getMaxTasks(), status2.getMaxTasks());
+ }
+}
Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestMessages.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestMessages.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestMessages.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestMessages.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import junit.framework.TestCase;
+
+import org.apache.hama.util.Bytes;
+
+public class TestMessages extends TestCase {
+
+ public void testByteMessage() {
+ int dataSize = (int) (Runtime.getRuntime().maxMemory() * 0.60);
+ ByteMessage msg = new ByteMessage(Bytes.toBytes("tag"), new byte[dataSize]);
+ assertEquals(msg.getData().length, dataSize);
+ msg = null;
+
+ byte[] dummyData = new byte[1024];
+ ByteMessage msg2 = new ByteMessage(Bytes.tail(dummyData, 128), dummyData);
+ assertEquals(
+ Bytes.compareTo(msg2.getTag(), 0, 128, msg2.getData(),
+ msg2.getData().length - 128, 128), 0);
+ }
+}
Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/TestMessages.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.checkpoint;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPMessage;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPSerializerWrapper;
+import org.apache.hama.bsp.DoubleMessage;
+import org.apache.hama.bsp.BSPPeer.BSPSerializableMessage;
+
+public class TestCheckpoint extends TestCase {
+
+ public static final Log LOG = LogFactory.getLog(TestCheckpoint.class);
+
+ private CheckpointRunner runner;
+ private BSPSerializerWrapper serializer;
+ static final String TEST_STRING = "Test String";
+ private FileSystem hdfs;
+ static final DoubleMessage estimate =
+ new DoubleMessage("192.168.1.123:61000", 3.1415926d);
+
+ public void setUp() throws Exception {
+ Configuration conf = new HamaConfiguration();
+ this.hdfs = FileSystem.get(conf);
+ assertNotNull("File system object should exist.", this.hdfs);
+ this.runner =
+ new CheckpointRunner(CheckpointRunner.buildCommands(conf));
+ assertNotNull("Checkpoint instance should exist.", this.runner);
+ this.runner.start();
+ Thread.sleep(1000*1);
+ Process process = this.runner.getProcess();
+ assertNotNull("Checkpoint process should be created.", process);
+ this.serializer = new BSPSerializerWrapper(conf,
+ Integer.parseInt(CheckpointRunner.DEFAULT_PORT));
+ }
+
+ private BSPMessageBundle createMessageBundle() {
+ BSPMessageBundle bundle = new BSPMessageBundle();
+ bundle.addMessage(estimate);
+ return bundle;
+ }
+
+ private String checkpointedPath() {
+ return "/tmp/" + "job_201108221205_000" + "/" + "0" +
+ "/" + "attempt_201108221205_0001_000000_0";
+ }
+
+ public void testCheckpoint() throws Exception {
+ this.serializer.serialize(new BSPSerializableMessage(
+ checkpointedPath(), createMessageBundle()));
+ Thread.sleep(1000);
+ Path path = new Path(checkpointedPath());
+ boolean exists = this.hdfs.exists(path);
+ assertTrue("Check if file is actually written to hdfs.", exists);
+ BSPMessageBundle bundle = new BSPMessageBundle();
+ DataInput in = new DataInputStream(this.hdfs.open(path));
+ bundle.readFields(in);
+ List<BSPMessage> messages = bundle.getMessages();
+ assertEquals("Only one message exists.", 1, messages.size());
+ for(BSPMessage message: messages) {
+ String peer = (String)message.getTag();
+ assertEquals("BSPPeer value in form of <ip>:<port>.", peer, estimate.getTag());
+ Double pi = (Double)message.getData();
+ assertEquals("Message content.", pi, estimate.getData());
+ }
+ }
+
+ public void tearDown() throws Exception {
+ this.runner.stop();
+ }
+
+}
Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/ipc/TestIPC.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/ipc/TestIPC.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/ipc/TestIPC.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringUtils;
+
+public class TestIPC extends TestCase {
+ public static final Log LOG = LogFactory.getLog(TestIPC.class);
+
+ final private static Configuration conf = new Configuration();
+ final static private int PING_INTERVAL = 1000;
+
+ static {
+ Client.setPingInterval(conf, PING_INTERVAL);
+ }
+
+ public TestIPC(String name) {
+ super(name);
+ }
+
+ private static final Random RANDOM = new Random();
+
+ private static final String ADDRESS = "0.0.0.0";
+
+ private static class TestServer extends Server {
+ private boolean sleep;
+
+ public TestServer(int handlerCount, boolean sleep) throws IOException {
+ super(ADDRESS, 0, LongWritable.class, handlerCount, conf);
+ this.sleep = sleep;
+ }
+
+ @Override
+ public Writable call(Class<?> protocol, Writable param, long receiveTime)
+ throws IOException {
+ if (sleep) {
+ try {
+ Thread.sleep(RANDOM.nextInt(2 * PING_INTERVAL)); // sleep a bit
+ } catch (InterruptedException e) {
+ }
+ }
+ return param; // echo param as result
+ }
+ }
+
+ private static class SerialCaller extends Thread {
+ private Client client;
+ private InetSocketAddress server;
+ private int count;
+ private boolean failed;
+
+ public SerialCaller(Client client, InetSocketAddress server, int count) {
+ this.client = client;
+ this.server = server;
+ this.count = count;
+ }
+
+ public void run() {
+ for (int i = 0; i < count; i++) {
+ try {
+ LongWritable param = new LongWritable(RANDOM.nextLong());
+ LongWritable value = (LongWritable) client.call(param, server, null,
+ null);
+ if (!param.equals(value)) {
+ LOG.fatal("Call failed!");
+ failed = true;
+ break;
+ }
+ } catch (Exception e) {
+ LOG.fatal("Caught: " + StringUtils.stringifyException(e));
+ failed = true;
+ }
+ }
+ }
+ }
+
+ private static class ParallelCaller extends Thread {
+ private Client client;
+ private int count;
+ private InetSocketAddress[] addresses;
+ private boolean failed;
+
+ public ParallelCaller(Client client, InetSocketAddress[] addresses,
+ int count) {
+ this.client = client;
+ this.addresses = addresses;
+ this.count = count;
+ }
+
+ public void run() {
+ for (int i = 0; i < count; i++) {
+ try {
+ Writable[] params = new Writable[addresses.length];
+ for (int j = 0; j < addresses.length; j++)
+ params[j] = new LongWritable(RANDOM.nextLong());
+ Writable[] values = client.call(params, addresses, null, null);
+ for (int j = 0; j < addresses.length; j++) {
+ if (!params[j].equals(values[j])) {
+ LOG.fatal("Call failed!");
+ failed = true;
+ break;
+ }
+ }
+ } catch (Exception e) {
+ LOG.fatal("Caught: " + StringUtils.stringifyException(e));
+ failed = true;
+ }
+ }
+ }
+ }
+
+ public void testSerial() throws Exception {
+ testSerial(3, false, 2, 5, 100);
+ }
+
+ public void testSerial(int handlerCount, boolean handlerSleep,
+ int clientCount, int callerCount, int callCount) throws Exception {
+ Server server = new TestServer(handlerCount, handlerSleep);
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ server.start();
+
+ Client[] clients = new Client[clientCount];
+ for (int i = 0; i < clientCount; i++) {
+ clients[i] = new Client(LongWritable.class, conf);
+ }
+
+ SerialCaller[] callers = new SerialCaller[callerCount];
+ for (int i = 0; i < callerCount; i++) {
+ callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount);
+ callers[i].start();
+ }
+ for (int i = 0; i < callerCount; i++) {
+ callers[i].join();
+ assertFalse(callers[i].failed);
+ }
+ for (int i = 0; i < clientCount; i++) {
+ clients[i].stop();
+ }
+ server.stop();
+ }
+
+ public void testParallel() throws Exception {
+ testParallel(10, false, 2, 4, 2, 4, 100);
+ }
+
+ public void testParallel(int handlerCount, boolean handlerSleep,
+ int serverCount, int addressCount, int clientCount, int callerCount,
+ int callCount) throws Exception {
+ Server[] servers = new Server[serverCount];
+ for (int i = 0; i < serverCount; i++) {
+ servers[i] = new TestServer(handlerCount, handlerSleep);
+ servers[i].start();
+ }
+
+ InetSocketAddress[] addresses = new InetSocketAddress[addressCount];
+ for (int i = 0; i < addressCount; i++) {
+ addresses[i] = NetUtils.getConnectAddress(servers[i % serverCount]);
+ }
+
+ Client[] clients = new Client[clientCount];
+ for (int i = 0; i < clientCount; i++) {
+ clients[i] = new Client(LongWritable.class, conf);
+ }
+
+ ParallelCaller[] callers = new ParallelCaller[callerCount];
+ for (int i = 0; i < callerCount; i++) {
+ callers[i] = new ParallelCaller(clients[i % clientCount], addresses,
+ callCount);
+ callers[i].start();
+ }
+ for (int i = 0; i < callerCount; i++) {
+ callers[i].join();
+ assertFalse(callers[i].failed);
+ }
+ for (int i = 0; i < clientCount; i++) {
+ clients[i].stop();
+ }
+ for (int i = 0; i < serverCount; i++) {
+ servers[i].stop();
+ }
+ }
+
+ public void testStandAloneClient() throws Exception {
+ testParallel(10, false, 2, 4, 2, 4, 100);
+ Client client = new Client(LongWritable.class, conf);
+ InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
+ try {
+ client.call(new LongWritable(RANDOM.nextLong()), address, null, null);
+ fail("Expected an exception to have been thrown");
+ } catch (IOException e) {
+ String message = e.getMessage();
+ String addressText = address.toString();
+ assertTrue("Did not find " + addressText + " in " + message, message
+ .contains(addressText));
+ Throwable cause = e.getCause();
+ assertNotNull("No nested exception in " + e, cause);
+ String causeText = cause.getMessage();
+ assertTrue("Did not find " + causeText + " in " + message, message
+ .contains(causeText));
+ }
+ }
+
+}
Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/ipc/TestIPC.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/ipc/TestRPC.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/ipc/TestRPC.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/ipc/TestRPC.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+import junit.framework.TestCase;
+
+public class TestRPC extends TestCase {
+ private static final int PORT = 1234;
+ private static final String ADDRESS = "0.0.0.0";
+
+ public static final Log LOG = LogFactory
+ .getLog("org.apache.hadoop.ipc.TestRPC");
+
+ private static Configuration conf = new Configuration();
+
+ public TestRPC(String name) {
+ super(name);
+ }
+
+ public interface TestProtocol extends VersionedProtocol {
+ public static final long versionID = 1L;
+
+ void ping() throws IOException;
+
+ String echo(String value) throws IOException;
+
+ String[] echo(String[] value) throws IOException;
+
+ Writable echo(Writable value) throws IOException;
+
+ int add(int v1, int v2) throws IOException;
+
+ int add(int[] values) throws IOException;
+
+ int error() throws IOException;
+
+ void testServerGet() throws IOException;
+ }
+
+ public class TestImpl implements TestProtocol {
+
+ public long getProtocolVersion(String protocol, long clientVersion) {
+ return TestProtocol.versionID;
+ }
+
+ public void ping() {
+ }
+
+ public String echo(String value) throws IOException {
+ return value;
+ }
+
+ public String[] echo(String[] values) throws IOException {
+ return values;
+ }
+
+ public Writable echo(Writable writable) {
+ return writable;
+ }
+
+ public int add(int v1, int v2) {
+ return v1 + v2;
+ }
+
+ public int add(int[] values) {
+ int sum = 0;
+ for (int i = 0; i < values.length; i++) {
+ sum += values[i];
+ }
+ return sum;
+ }
+
+ public int error() throws IOException {
+ throw new IOException("bobo");
+ }
+
+ public void testServerGet() throws IOException {
+ if (!(Server.get() instanceof RPC.Server)) {
+ throw new IOException("Server.get() failed");
+ }
+ }
+
+ }
+
+ public void testCalls() throws Exception {
+ Server server = RPC.getServer(new TestImpl(), ADDRESS, PORT, conf);
+ server.start();
+
+ InetSocketAddress addr = new InetSocketAddress(PORT);
+ TestProtocol proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
+ TestProtocol.versionID, addr, conf);
+
+ proxy.ping();
+
+ String stringResult = proxy.echo("foo");
+ assertEquals(stringResult, "foo");
+
+ stringResult = proxy.echo((String) null);
+ assertEquals(stringResult, null);
+
+ String[] stringResults = proxy.echo(new String[] { "foo", "bar" });
+ assertTrue(Arrays.equals(stringResults, new String[] { "foo", "bar" }));
+
+ stringResults = proxy.echo((String[]) null);
+ assertTrue(Arrays.equals(stringResults, null));
+
+ int intResult = proxy.add(1, 2);
+ assertEquals(intResult, 3);
+
+ intResult = proxy.add(new int[] { 1, 2 });
+ assertEquals(intResult, 3);
+
+ boolean caught = false;
+ try {
+ proxy.error();
+ } catch (IOException e) {
+ LOG.debug("Caught " + e);
+ caught = true;
+ }
+ assertTrue(caught);
+
+ proxy.testServerGet();
+
+ // try some multi-calls
+ Method echo = TestProtocol.class.getMethod("echo",
+ new Class[] { String.class });
+ String[] strings = (String[]) RPC.call(echo, new String[][] { { "a" },
+ { "b" } }, new InetSocketAddress[] { addr, addr }, null, conf);
+ assertTrue(Arrays.equals(strings, new String[] { "a", "b" }));
+
+ Method ping = TestProtocol.class.getMethod("ping", new Class[] {});
+ Object[] voids = (Object[]) RPC.call(ping, new Object[][] { {}, {} },
+ new InetSocketAddress[] { addr, addr }, null, conf);
+ assertEquals(voids, null);
+
+ server.stop();
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ new TestRPC("test").testCalls();
+
+ }
+
+}
Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/ipc/TestRPC.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestBytes.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestBytes.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestBytes.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestBytes.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+public class TestBytes extends TestCase {
+ public void testNullHashCode() {
+ byte [] b = null;
+ Exception ee = null;
+ try {
+ Bytes.hashCode(b);
+ } catch (Exception e) {
+ ee = e;
+ }
+ assertNotNull(ee);
+ }
+
+ public void testSplit() throws Exception {
+ byte [] lowest = Bytes.toBytes("AAA");
+ byte [] middle = Bytes.toBytes("CCC");
+ byte [] highest = Bytes.toBytes("EEE");
+ byte [][] parts = Bytes.split(lowest, highest, 1);
+ for (int i = 0; i < parts.length; i++) {
+ System.out.println(Bytes.toString(parts[i]));
+ }
+ assertEquals(3, parts.length);
+ assertTrue(Bytes.equals(parts[1], middle));
+ // Now divide into three parts. Change highest so split is even.
+ highest = Bytes.toBytes("DDD");
+ parts = Bytes.split(lowest, highest, 2);
+ for (int i = 0; i < parts.length; i++) {
+ System.out.println(Bytes.toString(parts[i]));
+ }
+ assertEquals(4, parts.length);
+ // Assert that 3rd part is 'CCC'.
+ assertTrue(Bytes.equals(parts[2], middle));
+ }
+
+ public void testSplit2() throws Exception {
+ // More split tests.
+ byte [] lowest = Bytes.toBytes("http://A");
+ byte [] highest = Bytes.toBytes("http://z");
+ byte [] middle = Bytes.toBytes("http://]");
+ byte [][] parts = Bytes.split(lowest, highest, 1);
+ for (int i = 0; i < parts.length; i++) {
+ System.out.println(Bytes.toString(parts[i]));
+ }
+ assertEquals(3, parts.length);
+ assertTrue(Bytes.equals(parts[1], middle));
+ }
+
+ public void testToLong() throws Exception {
+ long [] longs = {-1l, 123l, 122232323232l};
+ for (int i = 0; i < longs.length; i++) {
+ byte [] b = Bytes.toBytes(longs[i]);
+ assertEquals(longs[i], Bytes.toLong(b));
+ }
+ }
+
+ public void testToFloat() throws Exception {
+ float [] floats = {-1f, 123.123f, Float.MAX_VALUE};
+ for (int i = 0; i < floats.length; i++) {
+ byte [] b = Bytes.toBytes(floats[i]);
+ assertEquals(floats[i], Bytes.toFloat(b));
+ }
+ }
+
+ public void testToDouble() throws Exception {
+ double [] doubles = {Double.MIN_VALUE, Double.MAX_VALUE};
+ for (int i = 0; i < doubles.length; i++) {
+ byte [] b = Bytes.toBytes(doubles[i]);
+ assertEquals(doubles[i], Bytes.toDouble(b));
+ }
+ }
+
+ public void testBinarySearch() throws Exception {
+ byte [][] arr = {
+ {1},
+ {3},
+ {5},
+ {7},
+ {9},
+ {11},
+ {13},
+ {15},
+ };
+ byte [] key1 = {3,1};
+ byte [] key2 = {4,9};
+ byte [] key2_2 = {4};
+ byte [] key3 = {5,11};
+
+ assertEquals(1, Bytes.binarySearch(arr, key1, 0, 1,
+ Bytes.BYTES_RAWCOMPARATOR));
+ assertEquals(0, Bytes.binarySearch(arr, key1, 1, 1,
+ Bytes.BYTES_RAWCOMPARATOR));
+ assertEquals(-(2+1), Arrays.binarySearch(arr, key2_2,
+ Bytes.BYTES_COMPARATOR));
+ assertEquals(-(2+1), Bytes.binarySearch(arr, key2, 0, 1,
+ Bytes.BYTES_RAWCOMPARATOR));
+ assertEquals(4, Bytes.binarySearch(arr, key2, 1, 1,
+ Bytes.BYTES_RAWCOMPARATOR));
+ assertEquals(2, Bytes.binarySearch(arr, key3, 0, 1,
+ Bytes.BYTES_RAWCOMPARATOR));
+ assertEquals(5, Bytes.binarySearch(arr, key3, 1, 1,
+ Bytes.BYTES_RAWCOMPARATOR));
+ }
+
+ public void testIncrementBytes() throws IOException {
+
+ assertTrue(checkTestIncrementBytes(10, 1));
+ assertTrue(checkTestIncrementBytes(12, 123435445));
+ assertTrue(checkTestIncrementBytes(124634654, 1));
+ assertTrue(checkTestIncrementBytes(10005460, 5005645));
+ assertTrue(checkTestIncrementBytes(1, -1));
+ assertTrue(checkTestIncrementBytes(10, -1));
+ assertTrue(checkTestIncrementBytes(10, -5));
+ assertTrue(checkTestIncrementBytes(1005435000, -5));
+ assertTrue(checkTestIncrementBytes(10, -43657655));
+ assertTrue(checkTestIncrementBytes(-1, 1));
+ assertTrue(checkTestIncrementBytes(-26, 5034520));
+ assertTrue(checkTestIncrementBytes(-10657200, 5));
+ assertTrue(checkTestIncrementBytes(-12343250, 45376475));
+ assertTrue(checkTestIncrementBytes(-10, -5));
+ assertTrue(checkTestIncrementBytes(-12343250, -5));
+ assertTrue(checkTestIncrementBytes(-12, -34565445));
+ assertTrue(checkTestIncrementBytes(-1546543452, -34565445));
+ }
+
+ private static boolean checkTestIncrementBytes(long val, long amount)
+ throws IOException {
+ byte[] value = Bytes.toBytes(val);
+ byte [] testValue = {-1, -1, -1, -1, -1, -1, -1, -1};
+ if (value[0] > 0) {
+ testValue = new byte[Bytes.SIZEOF_LONG];
+ }
+ System.arraycopy(value, 0, testValue, testValue.length - value.length,
+ value.length);
+
+ long incrementResult = Bytes.toLong(Bytes.incrementBytes(value, amount));
+
+ return (Bytes.toLong(testValue) + amount) == incrementResult;
+ }
+}
Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestBytes.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestNumeric.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestNumeric.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestNumeric.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestNumeric.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,34 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import junit.framework.TestCase;
+
+public class TestNumeric extends TestCase {
+ final static int TEST_INT = 3;
+ final static double TEST_DOUBLE = 0.4;
+
+ /**
+ * Double conversion test
+ */
+ public void testDouble() {
+ assertEquals(Bytes.toDouble(Bytes.toBytes(TEST_DOUBLE)), TEST_DOUBLE);
+ }
+}
Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestNumeric.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestRandomVariable.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestRandomVariable.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestRandomVariable.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestRandomVariable.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,74 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import org.apache.log4j.Logger;
+
+import junit.framework.TestCase;
+
+/**
+ * Random variable generation test
+ */
+public class TestRandomVariable extends TestCase {
+ static final Logger LOG = Logger.getLogger(TestRandomVariable.class);
+ final static int COUNT = 50;
+
+ /**
+ * Random object test
+ *
+ * @throws Exception
+ */
+ public void testRand() throws Exception {
+ for (int i = 0; i < COUNT; i++) {
+ double result = RandomVariable.rand();
+ assertTrue(result >= 0.0d && result <= 1.0);
+ }
+ }
+
+ /**
+ * Random integer test
+ *
+ * @throws Exception
+ */
+ public void testRandInt() throws Exception {
+ final int min = 122;
+ final int max = 561;
+
+ for (int i = 0; i < COUNT; i++) {
+ int result = RandomVariable.randInt(min, max);
+ assertTrue(result >= min && result <= max);
+ }
+ }
+
+ /**
+ * Uniform test
+ *
+ * @throws Exception
+ */
+ public void testUniform() throws Exception {
+ final double min = 1.0d;
+ final double max = 3.0d;
+
+ for (int i = 0; i < COUNT; i++) {
+ double result = RandomVariable.uniform(min, max);
+ assertTrue(result >= min && result <= max);
+ }
+ }
+}
Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/util/TestRandomVariable.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.zookeeper;
+
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+
+import junit.framework.TestCase;
+
+public class TestZKTools extends TestCase {
+
+ public void testZKProps() {
+ HamaConfiguration conf = new HamaConfiguration();
+ conf.set(Constants.ZOOKEEPER_QUORUM, "test.com:123");
+ conf.set(Constants.ZOOKEEPER_CLIENT_PORT, "2222");
+
+ assertEquals("test.com:2222", QuorumPeer.getZKQuorumServersString(conf));
+ }
+}
Propchange: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/test/java/testjar/ClassSerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/testjar/ClassSerializePrinting.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/testjar/ClassSerializePrinting.java (added)
+++ incubator/hama/branches/HamaV2/core/src/test/java/testjar/ClassSerializePrinting.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package testjar;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.zookeeper.KeeperException;
+
+public class ClassSerializePrinting {
+ private static String TMP_OUTPUT = "/tmp/test-example/";
+
+ public static class HelloBSP extends BSP {
+ public static final Log LOG = LogFactory.getLog(HelloBSP.class);
+ private Configuration conf;
+ private final static int PRINT_INTERVAL = 1000;
+ private FileSystem fileSys;
+ private int num;
+
+ public void bsp(BSPPeer bspPeer) throws IOException,
+ KeeperException, InterruptedException {
+
+ int i = 0;
+ for (String otherPeer : bspPeer.getAllPeerNames()) {
+ String peerName = bspPeer.getPeerName();
+ if (peerName.equals(otherPeer)) {
+ writeLogToFile(peerName, i);
+ }
+
+ Thread.sleep(PRINT_INTERVAL);
+ bspPeer.sync();
+ i++;
+ }
+ }
+
+ private void writeLogToFile(String string, int i) throws IOException {
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+ new Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
+ CompressionType.NONE);
+ writer.append(new LongWritable(System.currentTimeMillis()), new Text(
+ "Hello BSP from " + (i + 1) + " of " + num + ": " + string));
+ writer.close();
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ num = Integer.parseInt(conf.get("bsp.peers.num"));
+ try {
+ fileSys = FileSystem.get(conf);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+}
Propchange: incubator/hama/branches/HamaV2/core/src/test/java/testjar/ClassSerializePrinting.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/branches/HamaV2/core/src/test/java/testjar/testjob.jar
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/testjar/testjob.jar?rev=1176297&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/hama/branches/HamaV2/core/src/test/java/testjar/testjob.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Propchange: incubator/hama/branches/HamaV2/core/target/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Sep 27 09:35:21 2011
@@ -0,0 +1 @@
+*
Added: incubator/hama/branches/HamaV2/core/target/.plxarc
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/target/.plxarc?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/target/.plxarc (added)
+++ incubator/hama/branches/HamaV2/core/target/.plxarc Tue Sep 27 09:35:21 2011
@@ -0,0 +1 @@
+maven-shared-archive-resources
\ No newline at end of file
Added: incubator/hama/branches/HamaV2/core/target/classes/META-INF/DEPENDENCIES
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/target/classes/META-INF/DEPENDENCIES?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/target/classes/META-INF/DEPENDENCIES (added)
+++ incubator/hama/branches/HamaV2/core/target/classes/META-INF/DEPENDENCIES Tue Sep 27 09:35:21 2011
@@ -0,0 +1,103 @@
+// ------------------------------------------------------------------
+// Transitive dependencies of this project determined from the
+// maven pom organized by organization.
+// ------------------------------------------------------------------
+
+Apache Hama Core
+
+
+From: 'an unknown organization'
+ - geronimo-spec-jta geronimo-spec:geronimo-spec-jta:jar:1.0.1B-rc4
+
+ - HSQLDB (http://hsqldb.org/) hsqldb:hsqldb:jar:1.8.0.10
+ License: HSQLDB License (http://hsqldb.org/web/hsqlLicense.html)
+ - JLine (http://jline.sourceforge.net) jline:jline:jar:0.9.94
+ License: BSD (LICENSE.txt)
+ - An open source Java toolkit for Amazon S3 (http://jets3t.s3.amazonaws.com/index.html) net.java.dev.jets3t:jets3t:jar:0.7.1
+ License: Apache License, Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
+ - kosmosfs (http://kosmosfs.sourceforge.net/) net.sf.kosmosfs:kfs:jar:0.3
+ License: The Apache Software License, Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
+ - hadoop-core org.apache.hadoop:hadoop-core:jar:0.20.2
+
+ - hadoop-test org.apache.hadoop:hadoop-test:jar:0.20.2
+
+ - servlet-api org.apache.tomcat:servlet-api:jar:6.0.32
+
+ - zookeeper org.apache.zookeeper:zookeeper:jar:3.3.1
+
+ - Eclipse JDT Core (http://www.eclipse.org/jdt/) org.eclipse.jdt:core:jar:3.1.1
+ License: Eclipse Public License v1.0 (http://www.eclipse.org/org/documents/epl-v10.php)
+ - oro oro:oro:jar:2.0.8
+
+ - jasper-compiler tomcat:jasper-compiler:jar:5.5.12
+
+ - jasper-runtime tomcat:jasper-runtime:jar:5.5.12
+
+ - xmlenc Library (http://xmlenc.sourceforge.net) xmlenc:xmlenc:jar:0.52
+ License: The BSD License (http://www.opensource.org/licenses/bsd-license.php)
+
+From: 'Apache MINA Project' (http://mina.apache.org/)
+ - Apache MINA Core (http://mina.apache.org/mina-core) org.apache.mina:mina-core:bundle:2.0.0-M5
+ License: Apache 2.0 License (http://www.apache.org/licenses/LICENSE-2.0)
+
+From: 'Apache Software Foundation' (http://jakarta.apache.org/)
+ - HttpClient (http://jakarta.apache.org/commons/httpclient/) commons-httpclient:commons-httpclient:jar:3.0.1
+ License: Apache License (http://www.apache.org/licenses/LICENSE-2.0)
+
+From: 'Apache Software Foundation' (http://www.apache.org)
+ - Annotation 1.0 (http://geronimo.apache.org/specs/geronimo-annotation_1.0_spec) org.apache.geronimo.specs:geronimo-annotation_1.0_spec:jar:1.0
+ License: The Apache Software License, Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+From: 'Apache Software Foundation'
+ - org.apache.tools.ant (http://ant.apache.org/ant/) org.apache.ant:ant:jar:1.7.1
+
+ - ant-launcher (http://ant.apache.org/ant-launcher/) org.apache.ant:ant-launcher:jar:1.7.1
+
+
+From: 'Mort Bay Consulting' (http://www.mortbay.com)
+ - Jetty Server (http://jetty.mortbay.org/project/modules/jetty) org.mortbay.jetty:jetty:jar:6.1.14
+ License: Apache License Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
+ - Servlet Annotations (http://jetty.mortbay.org/project/jetty-annotations) org.mortbay.jetty:jetty-annotations:jar:6.1.14
+ License: Apache License Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
+ - Jetty Plus (http://jetty.mortbay.org/project/jetty-plus) org.mortbay.jetty:jetty-plus:jar:6.1.14
+ License: Apache License Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
+ - Jetty Utilities (http://jetty.mortbay.org/project/jetty-util) org.mortbay.jetty:jetty-util:jar:6.1.14
+ License: Apache License Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
+ - Glassfish Jasper (http://jetty.mortbay.org/project/modules/jsp-2.1) org.mortbay.jetty:jsp-2.1:jar:6.1.14
+ License: CDDL 1.0 (https://glassfish.dev.java.net/public/CDDLv1.0.html)
+ - Glassfish Jasper API (http://jetty.mortbay.org/project/modules/jsp-api-2.1) org.mortbay.jetty:jsp-api-2.1:jar:6.1.14
+ License: Apache License Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
+ - Servlet Specification 2.5 API (http://jetty.mortbay.org/project/modules/servlet-api-2.5) org.mortbay.jetty:servlet-api-2.5:jar:6.1.14
+ License: CDDL 1.0 (https://glassfish.dev.java.net/public/CDDLv1.0.html)
+
+From: 'QOS.ch' (http://www.qos.ch)
+ - SLF4J API Module (http://www.slf4j.org) org.slf4j:slf4j-api:jar:1.5.2
+
+ - SLF4J LOG4J-12 Binding (http://www.slf4j.org) org.slf4j:slf4j-log4j12:jar:1.5.2
+
+
+From: 'The Apache Software Foundation' (http://jakarta.apache.org)
+ - Codec (http://jakarta.apache.org/commons/codec/) commons-codec:commons-codec:jar:1.3
+ License: The Apache Software License, Version 2.0 (/LICENSE.txt)
+ - EL (http://jakarta.apache.org/commons/el/) commons-el:commons-el:jar:1.0
+ License: The Apache Software License, Version 2.0 (/LICENSE.txt)
+ - Jakarta Commons Net (http://jakarta.apache.org/commons/${pom.artifactId.substring(8)}/) commons-net:commons-net:jar:1.4.1
+ License: The Apache Software License, Version 2.0 (/LICENSE.txt)
+
+From: 'The Apache Software Foundation' (http://www.apache.org/)
+ - ant (http://www.apache.org/ant/) ant:ant:jar:1.6.5
+ License: The Apache Software License, Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
+ - Commons CLI (http://commons.apache.org/cli/) commons-cli:commons-cli:jar:1.2
+ License: The Apache Software License, Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
+ - Commons Logging (http://commons.apache.org/logging) commons-logging:commons-logging:jar:1.1.1
+ License: The Apache Software License, Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
+ - Apache Ftplet API (http://mina.apache.org/ftpserver) org.apache.ftpserver:ftplet-api:bundle:1.0.0
+ License: Apache 2.0 License (http://www.apache.org/licenses/LICENSE-2.0)
+ - Apache FtpServer Core (http://mina.apache.org/ftpserver/ftpserver-core) org.apache.ftpserver:ftpserver-core:bundle:1.0.0
+ License: Apache 2.0 License (http://www.apache.org/licenses/LICENSE-2.0)
+ - Apache FtpServer Deprecated classes (http://mina.apache.org/ftpserver/ftpserver-deprecated) org.apache.ftpserver:ftpserver-deprecated:jar:1.0.0-M2
+ License: Apache 2.0 License (http://www.apache.org/licenses/LICENSE-2.0)
+
+
+
+