You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/06/28 04:20:42 UTC
svn commit: r958439 - in /incubator/hama/trunk: CHANGES.txt conf/hama-env.sh
src/examples/org/apache/hama/examples/PiEstimator.java
src/java/org/apache/hama/Constants.java
src/java/org/apache/hama/bsp/LocalJobRunner.java
Author: edwardyoon
Date: Mon Jun 28 02:20:41 2010
New Revision: 958439
URL: http://svn.apache.org/viewvc?rev=958439&view=rev
Log:
Implement killJob() method for local job
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/conf/hama-env.sh
incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
incubator/hama/trunk/src/java/org/apache/hama/Constants.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=958439&r1=958438&r2=958439&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Jun 28 02:20:41 2010
@@ -45,6 +45,7 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ HAMA-273: Implement killJob() method for local job (edwardyoon)
HAMA-271: Add LocalJobRunner (edwardyoon)
HAMA-264: Moving BlockID, Pair classes to examples package (edwardyoon)
HAMA-269: Add getter for serverName to BSPPeerInterface (edwardyoon)
Modified: incubator/hama/trunk/conf/hama-env.sh
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-env.sh?rev=958439&r1=958438&r2=958439&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-env.sh (original)
+++ incubator/hama/trunk/conf/hama-env.sh Mon Jun 28 02:20:41 2010
@@ -22,7 +22,7 @@
# Set environment variables here.
# The java implementation to use. Required.
-export JAVA_HOME=/usr/lib/jvm/java-6-sun
+# export JAVA_HOME=/usr/java/jdk1.6.0_20
# hadoop conf dir. to find the mapreduce cluster.
# export HADOOP_CONF_DIR=/usr/local/src/hadoop-0.20.1/conf
@@ -43,4 +43,4 @@ export JAVA_HOME=/usr/lib/jvm/java-6-sun
# export HAMA_SSH_OPTS="-o ConnectTimeout=1 -o SendEnv=HAMA_CONF_DIR"
# Tell Hama whether it should manage it's own instance of Zookeeper or not.
-# export HAMA_MANAGES_ZK=true
\ No newline at end of file
+# export HAMA_MANAGES_ZK=true
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=958439&r1=958438&r2=958439&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Mon Jun 28 02:20:41 2010
@@ -66,7 +66,7 @@ public class PiEstimator {
}
if (pi != 0.0)
- System.out.println("Estimated value of PI is " + pi);
+ System.out.println("\nEstimated value of PI is " + pi);
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=958439&r1=958438&r2=958439&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Mon Jun 28 02:20:41 2010
@@ -41,6 +41,9 @@ public interface Constants {
public static final long ATLEAST_WAIT_TIME = 100;
+ ///////////////////////////////////////
+ // Constants for ZooKeeper
+ ///////////////////////////////////////
/** zookeeper root */
public static final String ZOOKEEPER_ROOT = "bsp.zookeeper.root";
/** zookeeper default root */
@@ -58,10 +61,7 @@ public interface Constants {
public static final String ZOOKEEPER_PAUSE = "zookeeper.pause";
/** Default ZooKeeper pause value. In milliseconds. */
public static final int DEFAULT_ZOOKEEPER_PAUSE = 2 * 1000;
-
- ///////////////////////////////////////
- // Constants for ZooKeeper
- ///////////////////////////////////////
+
static final String ZOOKEEPER_CONFIG_NAME = "zoo.cfg";
static final int DEFAULT_ZOOKEPER_CLIENT_PORT = 21810;
static final String ZOOKEEPER_QUORUM = "hama.zookeeper.quorum";
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=958439&r1=958438&r2=958439&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Mon Jun 28 02:20:41 2010
@@ -30,7 +30,6 @@ public class LocalJobRunner implements J
private HashMap<String, Job> jobs = new HashMap<String, Job>();
public LocalJobRunner(Configuration conf) throws IOException {
- // TODO Auto-generated constructor stub
this.fs = FileSystem.get(conf);
this.conf = conf;
}
@@ -49,7 +48,6 @@ public class LocalJobRunner implements J
@Override
public String getFilesystemName() throws IOException {
- // TODO Auto-generated method stub
return fs.getUri().toString();
}
@@ -72,15 +70,13 @@ public class LocalJobRunner implements J
@Override
public String getSystemDir() {
- // TODO Auto-generated method stub
Path sysDir = new Path(conf.get("bsp.system.dir", "/tmp/hadoop/bsp/system"));
return fs.makeQualified(sysDir).toString();
}
@Override
public void killJob(BSPJobID jobid) throws IOException {
- // TODO Auto-generated method stub
-
+ jobs.get(jobid.toString()).done();
}
@Override
@@ -116,12 +112,13 @@ public class LocalJobRunner implements J
/**
* Local Job
*/
- private class Job implements Watcher {
+ private class Job extends Thread implements Watcher {
private JobStatus status = new JobStatus();
private Configuration conf;
private int NUM_PEER;
private BSPJob job;
private List<BSPRunner> list;
+ private boolean threadDone = false;
public Job(BSPJobID jobID, String jobFile, Configuration conf)
throws IOException {
@@ -133,7 +130,8 @@ public class LocalJobRunner implements J
LOG.info("Number of BSP tasks: " + NUM_PEER);
jobs.put(jobID.toString(), this);
- ZooKeeper zk = new ZooKeeper("localhost:21810", 3000, this);
+ ZooKeeper zk = new ZooKeeper(Constants.DEFAULT_ZOOKEEPER_SERVER_ADDR,
+ 3000, this);
Stat s = null;
if (zk != null) {
try {
@@ -153,39 +151,41 @@ public class LocalJobRunner implements J
}
}
}
+ this.start();
+ }
- list = new ArrayList<BSPRunner>();
- for (int i = 0; i < NUM_PEER; i++) {
- this.conf.setInt("bsp.peers.num", NUM_PEER);
- this.conf.set(Constants.PEER_HOST, "localhost");
- this.conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
- this.conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
- this.conf.setInt("NUM_PEER", NUM_PEER);
- BSPRunner runner = (BSPRunner) ReflectionUtils.newInstance(
- BSPRunner.class, this.conf);
-
- list.add(runner);
- }
+ public void run() {
+ while (!threadDone) {
+ list = new ArrayList<BSPRunner>();
+ for (int i = 0; i < NUM_PEER; i++) {
+ this.conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
+ BSPRunner runner = (BSPRunner) ReflectionUtils.newInstance(
+ BSPRunner.class, this.conf);
+ list.add(runner);
+ }
- for (int i = 0; i < NUM_PEER; i++) {
- list.get(i).start();
- }
+ for (int i = 0; i < NUM_PEER; i++) {
+ list.get(i).start();
+ }
- for (int i = 0; i < NUM_PEER; i++) {
- try {
- list.get(i).join();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ for (int i = 0; i < NUM_PEER; i++) {
+ try {
+ list.get(i).join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
+ done();
}
+ }
+ public void done() {
+ threadDone = true;
}
@Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
-
}
}
}