You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/06/28 04:20:42 UTC

svn commit: r958439 - in /incubator/hama/trunk: CHANGES.txt conf/hama-env.sh src/examples/org/apache/hama/examples/PiEstimator.java src/java/org/apache/hama/Constants.java src/java/org/apache/hama/bsp/LocalJobRunner.java

Author: edwardyoon
Date: Mon Jun 28 02:20:41 2010
New Revision: 958439

URL: http://svn.apache.org/viewvc?rev=958439&view=rev
Log:
Implement killJob() method for local job

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/conf/hama-env.sh
    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/src/java/org/apache/hama/Constants.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=958439&r1=958438&r2=958439&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Jun 28 02:20:41 2010
@@ -45,6 +45,7 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
     
+    HAMA-273: Implement killJob() method for local job (edwardyoon)
     HAMA-271: Add LocalJobRunner (edwardyoon)
     HAMA-264: Moving BlockID, Pair classes to examples package (edwardyoon)
     HAMA-269: Add getter for serverName to BSPPeerInterface (edwardyoon)

Modified: incubator/hama/trunk/conf/hama-env.sh
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-env.sh?rev=958439&r1=958438&r2=958439&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-env.sh (original)
+++ incubator/hama/trunk/conf/hama-env.sh Mon Jun 28 02:20:41 2010
@@ -22,7 +22,7 @@
 # Set environment variables here.
 
 # The java implementation to use.  Required.
-export JAVA_HOME=/usr/lib/jvm/java-6-sun
+# export JAVA_HOME=/usr/java/jdk1.6.0_20
 
 # hadoop conf dir. to find the mapreduce cluster.
 # export HADOOP_CONF_DIR=/usr/local/src/hadoop-0.20.1/conf
@@ -43,4 +43,4 @@ export JAVA_HOME=/usr/lib/jvm/java-6-sun
 # export HAMA_SSH_OPTS="-o ConnectTimeout=1 -o SendEnv=HAMA_CONF_DIR"
 
 # Tell Hama whether it should manage it's own instance of Zookeeper or not.
-# export HAMA_MANAGES_ZK=true
\ No newline at end of file
+# export HAMA_MANAGES_ZK=true

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=958439&r1=958438&r2=958439&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Mon Jun 28 02:20:41 2010
@@ -66,7 +66,7 @@ public class PiEstimator {
       }
 
       if (pi != 0.0)
-        System.out.println("Estimated value of PI is " + pi);
+        System.out.println("\nEstimated value of PI is " + pi);
 
     }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=958439&r1=958438&r2=958439&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Mon Jun 28 02:20:41 2010
@@ -41,6 +41,9 @@ public interface Constants {
 
   public static final long ATLEAST_WAIT_TIME = 100;
 
+  ///////////////////////////////////////
+  // Constants for ZooKeeper
+  ///////////////////////////////////////  
   /** zookeeper root */
   public static final String ZOOKEEPER_ROOT = "bsp.zookeeper.root";
   /** zookeeper default root */
@@ -58,10 +61,7 @@ public interface Constants {
   public static final String ZOOKEEPER_PAUSE = "zookeeper.pause";
   /** Default ZooKeeper pause value. In milliseconds. */
   public static final int DEFAULT_ZOOKEEPER_PAUSE = 2 * 1000;
-
-  ///////////////////////////////////////
-  // Constants for ZooKeeper
-  ///////////////////////////////////////  
+  
   static final String ZOOKEEPER_CONFIG_NAME = "zoo.cfg";
   static final int DEFAULT_ZOOKEPER_CLIENT_PORT = 21810;
   static final String ZOOKEEPER_QUORUM = "hama.zookeeper.quorum";

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=958439&r1=958438&r2=958439&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Mon Jun 28 02:20:41 2010
@@ -30,7 +30,6 @@ public class LocalJobRunner implements J
   private HashMap<String, Job> jobs = new HashMap<String, Job>();
 
   public LocalJobRunner(Configuration conf) throws IOException {
-    // TODO Auto-generated constructor stub
     this.fs = FileSystem.get(conf);
     this.conf = conf;
   }
@@ -49,7 +48,6 @@ public class LocalJobRunner implements J
 
   @Override
   public String getFilesystemName() throws IOException {
-    // TODO Auto-generated method stub
     return fs.getUri().toString();
   }
 
@@ -72,15 +70,13 @@ public class LocalJobRunner implements J
 
   @Override
   public String getSystemDir() {
-    // TODO Auto-generated method stub
     Path sysDir = new Path(conf.get("bsp.system.dir", "/tmp/hadoop/bsp/system"));
     return fs.makeQualified(sysDir).toString();
   }
 
   @Override
   public void killJob(BSPJobID jobid) throws IOException {
-    // TODO Auto-generated method stub
-
+    jobs.get(jobid.toString()).done();
   }
 
   @Override
@@ -116,12 +112,13 @@ public class LocalJobRunner implements J
   /**
    * Local Job
    */
-  private class Job implements Watcher {
+  private class Job extends Thread implements Watcher {
     private JobStatus status = new JobStatus();
     private Configuration conf;
     private int NUM_PEER;
     private BSPJob job;
     private List<BSPRunner> list;
+    private boolean threadDone = false;
 
     public Job(BSPJobID jobID, String jobFile, Configuration conf)
         throws IOException {
@@ -133,7 +130,8 @@ public class LocalJobRunner implements J
       LOG.info("Number of BSP tasks: " + NUM_PEER);
       jobs.put(jobID.toString(), this);
 
-      ZooKeeper zk = new ZooKeeper("localhost:21810", 3000, this);
+      ZooKeeper zk = new ZooKeeper(Constants.DEFAULT_ZOOKEEPER_SERVER_ADDR,
+          3000, this);
       Stat s = null;
       if (zk != null) {
         try {
@@ -153,39 +151,41 @@ public class LocalJobRunner implements J
           }
         }
       }
+      this.start();
+    }
 
-      list = new ArrayList<BSPRunner>();
-      for (int i = 0; i < NUM_PEER; i++) {
-        this.conf.setInt("bsp.peers.num", NUM_PEER);
-        this.conf.set(Constants.PEER_HOST, "localhost");
-        this.conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
-        this.conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
-        this.conf.setInt("NUM_PEER", NUM_PEER);
-        BSPRunner runner = (BSPRunner) ReflectionUtils.newInstance(
-            BSPRunner.class, this.conf);
-
-        list.add(runner);
-      }
+    public void run() {
+      while (!threadDone) {
+        list = new ArrayList<BSPRunner>();
+        for (int i = 0; i < NUM_PEER; i++) {
+          this.conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
+          BSPRunner runner = (BSPRunner) ReflectionUtils.newInstance(
+              BSPRunner.class, this.conf);
+          list.add(runner);
+        }
 
-      for (int i = 0; i < NUM_PEER; i++) {
-        list.get(i).start();
-      }
+        for (int i = 0; i < NUM_PEER; i++) {
+          list.get(i).start();
+        }
 
-      for (int i = 0; i < NUM_PEER; i++) {
-        try {
-          list.get(i).join();
-        } catch (InterruptedException e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
+        for (int i = 0; i < NUM_PEER; i++) {
+          try {
+            list.get(i).join();
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
         }
+        done();
       }
+    }
 
+    public void done() {
+      threadDone = true;
     }
 
     @Override
     public void process(WatchedEvent event) {
       // TODO Auto-generated method stub
-
     }
   }
 }