You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/06/27 10:16:21 UTC
svn commit: r958349 - in /incubator/hama/trunk: ./
src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/
src/java/org/apache/hama/ipc/ src/test/org/apache/hama/bsp/
Author: edwardyoon
Date: Sun Jun 27 08:16:20 2010
New Revision: 958349
URL: http://svn.apache.org/viewvc?rev=958349&view=rev
Log:
Add LocalJobRunner and Pi estimatior example
Added:
incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
Removed:
incubator/hama/trunk/src/test/org/apache/hama/bsp/UserInterface.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=958349&r1=958348&r2=958349&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Sun Jun 27 08:16:20 2010
@@ -4,6 +4,7 @@ Trunk (unreleased changes)
NEW FEATURES
+ HAMA-265: Add example Pi estimatior based on BSP (edwardyoon)
HAMA-266: Add Ant target to generate the UML class diagrams (edwardyoon)
HAMA-209: Add example for Cosine Similarity Matrix (edwardyoon)
HAMA-195: Interface of the bsp (hyunsik via edwardyoon)
@@ -42,7 +43,8 @@ Trunk (unreleased changes)
HAMA-2: The intial donation of Hama from the google project (edwardyoon)
IMPROVEMENTS
-
+
+ HAMA-271: Add LocalJobRunner (edwardyoon)
HAMA-264: Moving BlockID, Pair classes to examples package (edwardyoon)
HAMA-269: Add getter for serverName to BSPPeerInterface (edwardyoon)
HAMA-257: Design BSP program code interface (edwardyoon)
Added: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=958349&view=auto
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (added)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Sun Jun 27 08:16:20 2010
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobClient;
+import org.apache.hama.bsp.BSPMessage;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.zookeeper.KeeperException;
+import org.mortbay.log.Log;
+
+public class PiEstimator {
+
+ public static class MyEstimator extends BSP {
+ private Configuration conf;
+ private static final int iterations = 10000;
+
+ @Override
+ public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
+ InterruptedException {
+ int in = 0, out = 0;
+ for (int i = 0; i < iterations; i++) {
+ double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
+ if ((Math.sqrt(x * x + y * y) < 1.0)) {
+ in++;
+ } else {
+ out++;
+ }
+ }
+
+ byte[] tagName = Bytes.toBytes(getName().toString());
+ byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
+ BSPMessage estimate = new BSPMessage(tagName, myData);
+
+ Log.info("Send a message (" + Bytes.toDouble(myData) + ") from "
+ + bspPeer.getServerName() + " to localhost:30000");
+ bspPeer.send(new InetSocketAddress("localhost", 30000), estimate);
+ bspPeer.sync();
+
+ double pi = 0.0;
+ BSPMessage received;
+ while ((received = bspPeer.getCurrentMessage()) != null) {
+ pi = (pi + Bytes.toDouble(received.getData())) / 2;
+ }
+
+ if (pi != 0.0)
+ System.out.println("Estimated value of PI is " + pi);
+
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ }
+
+ public static void main(String[] args) throws InterruptedException,
+ IOException {
+ // BSP job configuration
+ HamaConfiguration conf = new HamaConfiguration();
+ // Execute locally
+ conf.set("bsp.master.address", "local");
+
+ BSPJob bsp = new BSPJob(conf, PiEstimator.class);
+ // Set the job name
+ bsp.setJobName("pi estimation example");
+ bsp.setBspClass(MyEstimator.class);
+
+ bsp.setNumBspTask(10);
+ BSPJobClient.runJob(bsp);
+ }
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java?rev=958349&r1=958348&r2=958349&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java Sun Jun 27 08:16:20 2010
@@ -17,27 +17,15 @@
*/
package org.apache.hama.bsp;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.zookeeper.KeeperException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* This class provides an abstract implementation of the BSP interface
*/
public abstract class BSP extends Thread implements BSPInterface {
- private BSPPeer peer;
-
- /**
- * Constructor for abstract class
- *
- * @param conf
- * @throws IOException
- */
- public BSP(Configuration conf) throws IOException {
- this.peer = new BSPPeer(conf);
- }
-
+ private static final Log LOG = LogFactory.getLog(BSP.class);
+
/**
* A thread's run method.
*
@@ -46,16 +34,9 @@ public abstract class BSP extends Thread
*/
public void run() {
try {
- bsp(peer);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (KeeperException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ bsp(new BSPPeer(this.getConf()));
+ } catch (Exception e) {
+ LOG.error(e);
}
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java?rev=958349&r1=958348&r2=958349&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java Sun Jun 27 08:16:20 2010
@@ -19,13 +19,14 @@ package org.apache.hama.bsp;
import java.io.IOException;
+import org.apache.hadoop.conf.Configurable;
import org.apache.zookeeper.KeeperException;
/**
* Interface BSP defines the basic operations needed to implement the BSP
* algorithm.
*/
-public interface BSPInterface {
+public interface BSPInterface extends Configurable {
/**
* A user defined function for programming in the BSP style.
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java?rev=958349&r1=958348&r2=958349&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java Sun Jun 27 08:16:20 2010
@@ -24,8 +24,6 @@ import java.net.URLDecoder;
import java.util.Enumeration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hama.HamaConfiguration;
public class BSPJob extends BSPJobContext {
@@ -51,6 +49,16 @@ public class BSPJob extends BSPJobContex
jobClient = new BSPJobClient(conf);
}
+ public BSPJob(BSPJobID jobID, String jobFile) throws IOException {
+ super(new Path(jobFile), jobID);
+ }
+
+ @SuppressWarnings("unchecked")
+ public BSPJob(HamaConfiguration conf, Class exampleClass) throws IOException {
+ this(conf);
+ setJarByClass(exampleClass);
+ }
+
private void ensureState(JobState state) throws IllegalStateException {
if (state != this.state) {
throw new IllegalStateException("Job in state " + this.state
@@ -78,6 +86,11 @@ public class BSPJob extends BSPJobContex
ensureState(JobState.DEFINE);
conf.setClass(WORK_CLASS_ATTR, cls, BSP.class);
}
+
+ @SuppressWarnings("unchecked")
+ public Class<? extends BSP> getBspClass() {
+ return (Class<? extends BSP>) conf.getClass(WORK_CLASS_ATTR, BSP.class);
+ }
public void setJar(String jar) {
conf.set("bsp.jar", jar);
@@ -122,16 +135,6 @@ public class BSPJob extends BSPJobContex
}
- public void setInputFormat(Class<? extends InputFormat> cls) {
- ensureState(JobState.DEFINE);
- conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, InputFormat.class);
- }
-
- public void setOutputFormat(Class<? extends OutputFormat> cls) {
- ensureState(JobState.DEFINE);
- conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, OutputFormat.class);
- }
-
public void setUser(String user) {
conf.set("user.name", user);
}
@@ -192,4 +195,9 @@ public class BSPJob extends BSPJobContex
public void set(String name, String value) {
conf.set(name, value);
}
+
+ public void setNumBspTask(int tasks) {
+ conf.setInt("bsp.peers.num", tasks);
+ }
+
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=958349&r1=958348&r2=958349&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Sun Jun 27 08:16:20 2010
@@ -172,9 +172,6 @@ public class BSPJobClient extends Config
final static FsPermission JOB_DIR_PERMISSION =
FsPermission.createImmutable((short) 0777); // rwx-rwx-rwx
- public BSPJobClient() {
- }
-
public BSPJobClient(Configuration conf) throws IOException {
setConf(conf);
init(conf);
@@ -182,8 +179,12 @@ public class BSPJobClient extends Config
public void init(Configuration conf) throws IOException {
// it will be used to determine if the bspmaster is running on local or not.
- //String tracker = conf.get("bsp.master.address", "local");
- this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf);
+ String master = conf.get("bsp.master.address", "local");
+ if ("local".equals(master)) {
+ this.jobSubmitClient = new LocalJobRunner(conf);
+ } else {
+ this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf);
+ }
}
private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
@@ -305,7 +306,7 @@ public class BSPJobClient extends Config
public
RunningJob submitJobInternal(BSPJob job) throws IOException {
- BSPJobID jobId = jobSubmitClient.getNewJobId();
+ BSPJobID jobId = jobSubmitClient.getNewJobId();
Path submitJobDir = new Path(getSystemDir(), jobId.toString());
Path submitJarFile = new Path(submitJobDir, "job.jar");
Path submitJobFile = new Path(submitJobDir, "job.xml");
@@ -365,7 +366,7 @@ public class BSPJobClient extends Config
//
// Now, actually submit the job (using the submit name)
//
- JobStatus status = jobSubmitClient.submitJob(jobId);
+ JobStatus status = jobSubmitClient.submitJob(jobId, submitJobFile.toString());
if (status != null) {
return new NetworkedJob(status);
} else {
@@ -416,9 +417,10 @@ public class BSPJobClient extends Config
return sysDir;
}
- public RunningJob runJob(BSPJob job) throws FileNotFoundException,
+ public static RunningJob runJob(BSPJob job) throws FileNotFoundException,
IOException {
- return submitJobInternal(job);
+ BSPJobClient jc = new BSPJobClient(job.getConf());
+ return jc.submitJobInternal(job);
}
/**
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=958349&r1=958348&r2=958349&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Sun Jun 27 08:16:20 2010
@@ -591,4 +591,10 @@ public class BSPMaster extends Thread im
public void shutdown() {
this.interTrackerServer.stop();
}
+
+ @Override
+ public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=958349&r1=958348&r2=958349&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Sun Jun 27 08:16:20 2010
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentLi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
@@ -167,7 +168,7 @@ public class BSPPeer implements Watcher,
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(bspRoot, true);
- if (list.size() < Integer.valueOf(conf.get("bsp.peers.num"))) {
+ if (list.size() < conf.getInt("bsp.peers.num", 0)) {
mutex.wait();
} else {
return true;
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java?rev=958349&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java Sun Jun 27 08:16:20 2010
@@ -0,0 +1,42 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class BSPRunner extends Thread implements Configurable {
+ private static final Log LOG = LogFactory.getLog(BSPRunner.class);
+ private BSPPeer bspPeer;
+ private Configuration conf;
+ private BSP bsp;
+
+ public void run() {
+ try {
+ bsp.bsp(bspPeer);
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ try {
+ this.bspPeer = new BSPPeer(conf);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class",
+ BSP.class), conf);
+ }
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=958349&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Sun Jun 27 08:16:20 2010
@@ -0,0 +1,191 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.ipc.InterTrackerProtocol;
+import org.apache.hama.ipc.JobSubmissionProtocol;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+public class LocalJobRunner implements JobSubmissionProtocol {
+ private static final Log LOG = LogFactory.getLog(BSPJobClient.class);
+ private FileSystem fs;
+ private Configuration conf;
+ private int nextJobId = 1;
+ private HashMap<String, Job> jobs = new HashMap<String, Job>();
+
+ public LocalJobRunner(Configuration conf) throws IOException {
+ // TODO Auto-generated constructor stub
+ this.fs = FileSystem.get(conf);
+ this.conf = conf;
+ }
+
+ @Override
+ public JobStatus[] getAllJobs() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public String getFilesystemName() throws IOException {
+ // TODO Auto-generated method stub
+ return fs.getUri().toString();
+ }
+
+ @Override
+ public JobProfile getJobProfile(BSPJobID jobid) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public JobStatus getJobStatus(BSPJobID jobid) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public BSPJobID getNewJobId() throws IOException {
+ return new BSPJobID("local", nextJobId++);
+ }
+
+ @Override
+ public String getSystemDir() {
+ // TODO Auto-generated method stub
+ Path sysDir = new Path(conf.get("bsp.system.dir", "/tmp/hadoop/bsp/system"));
+ return fs.makeQualified(sysDir).toString();
+ }
+
+ @Override
+ public void killJob(BSPJobID jobid) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public boolean killTask(TaskAttemptID taskId, boolean shouldFail)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
+ return new Job(jobID, jobFile, this.conf).status;
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ if (protocol.equals(InterTrackerProtocol.class.getName())) {
+ return InterTrackerProtocol.versionID;
+ } else if (protocol.equals(JobSubmissionProtocol.class.getName())) {
+ return JobSubmissionProtocol.versionID;
+ } else {
+ throw new IOException("Unknown protocol to job tracker: " + protocol);
+ }
+ }
+
+ @Override
+ public JobStatus submitJob(BSPJobID jobName) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /**
+ * Local Job
+ */
+ private class Job implements Watcher {
+ private JobStatus status = new JobStatus();
+ private Configuration conf;
+ private int NUM_PEER;
+ private BSPJob job;
+ private List<BSPRunner> list;
+
+ public Job(BSPJobID jobID, String jobFile, Configuration conf)
+ throws IOException {
+ this.conf = conf;
+ this.NUM_PEER = conf.getInt("bsp.peers.num", 0);
+ LOG.info("LocalJobRunner: " + jobID + ", " + jobFile);
+ this.job = new BSPJob(jobID, jobFile);
+ LOG.info("Jar file: " + job.getJar());
+ LOG.info("Number of BSP tasks: " + NUM_PEER);
+ jobs.put(jobID.toString(), this);
+
+ ZooKeeper zk = new ZooKeeper("localhost:21810", 3000, this);
+ Stat s = null;
+ if (zk != null) {
+ try {
+ s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false);
+ } catch (Exception e) {
+ LOG.error(s);
+ }
+
+ if (s == null) {
+ try {
+ zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (KeeperException e) {
+ LOG.error(e);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+ }
+
+ list = new ArrayList<BSPRunner>();
+ for (int i = 0; i < NUM_PEER; i++) {
+ this.conf.setInt("bsp.peers.num", NUM_PEER);
+ this.conf.set(Constants.PEER_HOST, "localhost");
+ this.conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
+ this.conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
+ this.conf.setInt("NUM_PEER", NUM_PEER);
+ BSPRunner runner = (BSPRunner) ReflectionUtils.newInstance(
+ BSPRunner.class, this.conf);
+
+ list.add(runner);
+ }
+
+ for (int i = 0; i < NUM_PEER; i++) {
+ list.get(i).start();
+ }
+
+ for (int i = 0; i < NUM_PEER; i++) {
+ try {
+ list.get(i).join();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ // TODO Auto-generated method stub
+
+ }
+ }
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java?rev=958349&r1=958348&r2=958349&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java Sun Jun 27 08:16:20 2010
@@ -102,6 +102,9 @@ public interface JobSubmissionProtocol e
* @param shouldFail if true the task is failed and added to failed tasks list, otherwise
* it is just killed, w/o affecting job failure status.
*/
- public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException;
+ public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException;
+
+
+ JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException;
}
Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java?rev=958349&r1=958348&r2=958349&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java Sun Jun 27 08:16:20 2010
@@ -155,7 +155,7 @@ public class BSPPeerTest extends HamaClu
BSPPeerThread thread;
for (int i = 0; i < NUM_PEER; i++) {
- conf.set("bsp.peers.num", String.valueOf(NUM_PEER));
+ conf.setInt("bsp.peers.num", NUM_PEER);
conf.set(Constants.PEER_HOST, "localhost");
conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java?rev=958349&r1=958348&r2=958349&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java Sun Jun 27 08:16:20 2010
@@ -60,7 +60,7 @@ public class SerializePrinting extends H
BSPPeerThread thread;
int[] randomSequence = new int[] { 2, 3, 4, 5, 0, 1, 6, 7, 8, 9 };
for (int i = 0; i < NUM_PEER; i++) {
- conf.set("bsp.peers.num", String.valueOf(NUM_PEER));
+ conf.setInt("bsp.peers.num", NUM_PEER);
conf.set(Constants.PEER_HOST, "localhost");
conf.set(Constants.PEER_PORT, String
.valueOf(30000 + randomSequence[i]));