You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/06/27 10:16:21 UTC

svn commit: r958349 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/ src/java/org/apache/hama/ipc/ src/test/org/apache/hama/bsp/

Author: edwardyoon
Date: Sun Jun 27 08:16:20 2010
New Revision: 958349

URL: http://svn.apache.org/viewvc?rev=958349&view=rev
Log:
Add LocalJobRunner and Pi estimatior example

Added:
    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
Removed:
    incubator/hama/trunk/src/test/org/apache/hama/bsp/UserInterface.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=958349&r1=958348&r2=958349&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Sun Jun 27 08:16:20 2010
@@ -4,6 +4,7 @@ Trunk (unreleased changes)
 
   NEW FEATURES
 
+    HAMA-265: Add example Pi estimatior based on BSP (edwardyoon)
     HAMA-266: Add Ant target to generate the UML class diagrams (edwardyoon)
     HAMA-209: Add example for Cosine Similarity Matrix (edwardyoon)
     HAMA-195: Interface of the bsp (hyunsik via edwardyoon)
@@ -42,7 +43,8 @@ Trunk (unreleased changes)
     HAMA-2: The intial donation of Hama from the google project (edwardyoon)
 
   IMPROVEMENTS
-
+    
+    HAMA-271: Add LocalJobRunner (edwardyoon)
     HAMA-264: Moving BlockID, Pair classes to examples package (edwardyoon)
     HAMA-269: Add getter for serverName to BSPPeerInterface (edwardyoon)
     HAMA-257: Design BSP program code interface (edwardyoon)

Added: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=958349&view=auto
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (added)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Sun Jun 27 08:16:20 2010
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobClient;
+import org.apache.hama.bsp.BSPMessage;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.zookeeper.KeeperException;
+import org.mortbay.log.Log;
+
+public class PiEstimator {
+
+  public static class MyEstimator extends BSP {
+    private Configuration conf;
+    private static final int iterations = 10000;
+    
+    @Override
+    public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
+        InterruptedException {
+      int in = 0, out = 0;
+      for (int i = 0; i < iterations; i++) {
+        double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
+        if ((Math.sqrt(x * x + y * y) < 1.0)) {
+          in++;
+        } else {
+          out++;
+        }
+      }
+
+      byte[] tagName = Bytes.toBytes(getName().toString());
+      byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
+      BSPMessage estimate = new BSPMessage(tagName, myData);
+
+      Log.info("Send a message (" + Bytes.toDouble(myData) + ") from "
+          + bspPeer.getServerName() + " to localhost:30000");
+      bspPeer.send(new InetSocketAddress("localhost", 30000), estimate);
+      bspPeer.sync();
+
+      double pi = 0.0;
+      BSPMessage received;
+      while ((received = bspPeer.getCurrentMessage()) != null) {
+        pi = (pi + Bytes.toDouble(received.getData())) / 2;
+      }
+
+      if (pi != 0.0)
+        System.out.println("Estimated value of PI is " + pi);
+
+    }
+
+    @Override
+    public Configuration getConf() {
+      return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+    }
+    
+  }
+
+  public static void main(String[] args) throws InterruptedException,
+      IOException {
+    // BSP job configuration
+    HamaConfiguration conf = new HamaConfiguration();
+    // Execute locally
+    conf.set("bsp.master.address", "local");
+
+    BSPJob bsp = new BSPJob(conf, PiEstimator.class);
+    // Set the job name
+    bsp.setJobName("pi estimation example");
+    bsp.setBspClass(MyEstimator.class);
+
+    bsp.setNumBspTask(10);
+    BSPJobClient.runJob(bsp);
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java?rev=958349&r1=958348&r2=958349&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java Sun Jun 27 08:16:20 2010
@@ -17,27 +17,15 @@
  */
 package org.apache.hama.bsp;
 
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.zookeeper.KeeperException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * This class provides an abstract implementation of the BSP interface
  */
 public abstract class BSP extends Thread implements BSPInterface {
-  private BSPPeer peer;
-
-  /**
-   * Constructor for abstract class
-   * 
-   * @param conf
-   * @throws IOException
-   */
-  public BSP(Configuration conf) throws IOException {
-    this.peer = new BSPPeer(conf);
-  }
-
+  private static final Log LOG = LogFactory.getLog(BSP.class);
+  
   /**
    * A thread's run method.
    * 
@@ -46,16 +34,9 @@ public abstract class BSP extends Thread
    */
   public void run() {
     try {
-      bsp(peer);
-    } catch (IOException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    } catch (KeeperException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      bsp(new BSPPeer(this.getConf()));
+    } catch (Exception e) {
+      LOG.error(e);
     }
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java?rev=958349&r1=958348&r2=958349&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java Sun Jun 27 08:16:20 2010
@@ -19,13 +19,14 @@ package org.apache.hama.bsp;
 
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configurable;
 import org.apache.zookeeper.KeeperException;
 
 /**
  * Interface BSP defines the basic operations needed to implement the BSP
  * algorithm.
  */
-public interface BSPInterface {
+public interface BSPInterface extends Configurable {
 
   /**
    * A user defined function for programming in the BSP style.

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java?rev=958349&r1=958348&r2=958349&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java Sun Jun 27 08:16:20 2010
@@ -24,8 +24,6 @@ import java.net.URLDecoder;
 import java.util.Enumeration;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hama.HamaConfiguration;
 
 public class BSPJob extends BSPJobContext {
@@ -51,6 +49,16 @@ public class BSPJob extends BSPJobContex
     jobClient = new BSPJobClient(conf);
   }
 
+  public BSPJob(BSPJobID jobID, String jobFile) throws IOException {
+     super(new Path(jobFile), jobID);
+  }
+
+  @SuppressWarnings("unchecked")
+  public BSPJob(HamaConfiguration conf, Class exampleClass) throws IOException {
+    this(conf);
+    setJarByClass(exampleClass);
+  }
+
   private void ensureState(JobState state) throws IllegalStateException {
     if (state != this.state) {
       throw new IllegalStateException("Job in state " + this.state
@@ -78,6 +86,11 @@ public class BSPJob extends BSPJobContex
     ensureState(JobState.DEFINE);
     conf.setClass(WORK_CLASS_ATTR, cls, BSP.class);
   }
+  
+  @SuppressWarnings("unchecked")
+  public Class<? extends BSP> getBspClass() {
+    return (Class<? extends BSP>) conf.getClass(WORK_CLASS_ATTR, BSP.class);
+  }
 
   public void setJar(String jar) {
     conf.set("bsp.jar", jar);
@@ -122,16 +135,6 @@ public class BSPJob extends BSPJobContex
         
   }
 
-  public void setInputFormat(Class<? extends InputFormat> cls) {
-    ensureState(JobState.DEFINE);
-    conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, InputFormat.class);
-  }
-
-  public void setOutputFormat(Class<? extends OutputFormat> cls) {
-    ensureState(JobState.DEFINE);
-    conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, OutputFormat.class);    
-  }
-
   public void setUser(String user) {
     conf.set("user.name", user);
   }
@@ -192,4 +195,9 @@ public class BSPJob extends BSPJobContex
   public void set(String name, String value) {
     conf.set(name, value);
   }
+
+  public void setNumBspTask(int tasks) {
+    conf.setInt("bsp.peers.num", tasks);
+  }
+
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=958349&r1=958348&r2=958349&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Sun Jun 27 08:16:20 2010
@@ -172,9 +172,6 @@ public class BSPJobClient extends Config
   final static FsPermission JOB_DIR_PERMISSION =
     FsPermission.createImmutable((short) 0777); // rwx-rwx-rwx
 
-  public BSPJobClient() {
-  }
-
   public BSPJobClient(Configuration conf) throws IOException {
     setConf(conf);
     init(conf);
@@ -182,8 +179,12 @@ public class BSPJobClient extends Config
 
   public void init(Configuration conf) throws IOException {
     // it will be used to determine if the bspmaster is running on local or not. 
-    //String tracker = conf.get("bsp.master.address", "local"); 
-    this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf);
+    String master = conf.get("bsp.master.address", "local");
+    if ("local".equals(master)) {
+      this.jobSubmitClient = new LocalJobRunner(conf);
+    } else {
+      this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf);
+    }
   }
 
   private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
@@ -305,7 +306,7 @@ public class BSPJobClient extends Config
   
   public 
   RunningJob submitJobInternal(BSPJob job) throws IOException {
-    BSPJobID jobId = jobSubmitClient.getNewJobId();    
+    BSPJobID jobId = jobSubmitClient.getNewJobId();
     Path submitJobDir = new Path(getSystemDir(), jobId.toString());
     Path submitJarFile = new Path(submitJobDir, "job.jar");    
     Path submitJobFile = new Path(submitJobDir, "job.xml");
@@ -365,7 +366,7 @@ public class BSPJobClient extends Config
     //
     // Now, actually submit the job (using the submit name)
     //
-    JobStatus status = jobSubmitClient.submitJob(jobId);
+    JobStatus status = jobSubmitClient.submitJob(jobId, submitJobFile.toString());
     if (status != null) {
       return new NetworkedJob(status);
     } else {
@@ -416,9 +417,10 @@ public class BSPJobClient extends Config
     return sysDir;
   }
 
-  public RunningJob runJob(BSPJob job) throws FileNotFoundException,
+  public static RunningJob runJob(BSPJob job) throws FileNotFoundException,
   IOException {
-    return submitJobInternal(job);    
+    BSPJobClient jc = new BSPJobClient(job.getConf());
+    return jc.submitJobInternal(job);    
   }
   
   /**

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=958349&r1=958348&r2=958349&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Sun Jun 27 08:16:20 2010
@@ -591,4 +591,10 @@ public class BSPMaster extends Thread im
   public void shutdown() {
     this.interTrackerServer.stop();
   }
+
+  @Override
+  public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=958349&r1=958348&r2=958349&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Sun Jun 27 08:16:20 2010
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentLi
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
@@ -167,7 +168,7 @@ public class BSPPeer implements Watcher,
     while (true) {
       synchronized (mutex) {
         List<String> list = zk.getChildren(bspRoot, true);
-        if (list.size() < Integer.valueOf(conf.get("bsp.peers.num"))) {
+        if (list.size() < conf.getInt("bsp.peers.num", 0)) {
           mutex.wait();
         } else {
           return true;

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java?rev=958349&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java Sun Jun 27 08:16:20 2010
@@ -0,0 +1,42 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class BSPRunner extends Thread implements Configurable {
+  private static final Log LOG = LogFactory.getLog(BSPRunner.class);
+  private BSPPeer bspPeer;
+  private Configuration conf;
+  private BSP bsp;
+
+  public void run() {
+    try {
+      bsp.bsp(bspPeer);
+    } catch (Exception e) {
+      LOG.error(e);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    try {
+      this.bspPeer = new BSPPeer(conf);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class",
+        BSP.class), conf);
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=958349&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Sun Jun 27 08:16:20 2010
@@ -0,0 +1,191 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.ipc.InterTrackerProtocol;
+import org.apache.hama.ipc.JobSubmissionProtocol;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+public class LocalJobRunner implements JobSubmissionProtocol {
+  private static final Log LOG = LogFactory.getLog(BSPJobClient.class);
+  private FileSystem fs;
+  private Configuration conf;
+  private int nextJobId = 1;
+  private HashMap<String, Job> jobs = new HashMap<String, Job>();
+
+  public LocalJobRunner(Configuration conf) throws IOException {
+    // TODO Auto-generated constructor stub
+    this.fs = FileSystem.get(conf);
+    this.conf = conf;
+  }
+
+  @Override
+  public JobStatus[] getAllJobs() throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public String getFilesystemName() throws IOException {
+    // TODO Auto-generated method stub
+    return fs.getUri().toString();
+  }
+
+  @Override
+  public JobProfile getJobProfile(BSPJobID jobid) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public JobStatus getJobStatus(BSPJobID jobid) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public BSPJobID getNewJobId() throws IOException {
+    return new BSPJobID("local", nextJobId++);
+  }
+
+  @Override
+  public String getSystemDir() {
+    // TODO Auto-generated method stub
+    Path sysDir = new Path(conf.get("bsp.system.dir", "/tmp/hadoop/bsp/system"));
+    return fs.makeQualified(sysDir).toString();
+  }
+
+  @Override
+  public void killJob(BSPJobID jobid) throws IOException {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public boolean killTask(TaskAttemptID taskId, boolean shouldFail)
+      throws IOException {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  @Override
+  public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
+    return new Job(jobID, jobFile, this.conf).status;
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion)
+      throws IOException {
+    if (protocol.equals(InterTrackerProtocol.class.getName())) {
+      return InterTrackerProtocol.versionID;
+    } else if (protocol.equals(JobSubmissionProtocol.class.getName())) {
+      return JobSubmissionProtocol.versionID;
+    } else {
+      throw new IOException("Unknown protocol to job tracker: " + protocol);
+    }
+  }
+
+  @Override
+  public JobStatus submitJob(BSPJobID jobName) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  /**
+   * Local Job
+   */
+  private class Job implements Watcher {
+    private JobStatus status = new JobStatus();
+    private Configuration conf;
+    private int NUM_PEER;
+    private BSPJob job;
+    private List<BSPRunner> list;
+
+    public Job(BSPJobID jobID, String jobFile, Configuration conf)
+        throws IOException {
+      this.conf = conf;
+      this.NUM_PEER = conf.getInt("bsp.peers.num", 0);
+      LOG.info("LocalJobRunner: " + jobID + ", " + jobFile);
+      this.job = new BSPJob(jobID, jobFile);
+      LOG.info("Jar file: " + job.getJar());
+      LOG.info("Number of BSP tasks: " + NUM_PEER);
+      jobs.put(jobID.toString(), this);
+
+      ZooKeeper zk = new ZooKeeper("localhost:21810", 3000, this);
+      Stat s = null;
+      if (zk != null) {
+        try {
+          s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false);
+        } catch (Exception e) {
+          LOG.error(s);
+        }
+
+        if (s == null) {
+          try {
+            zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
+                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+          } catch (KeeperException e) {
+            LOG.error(e);
+          } catch (InterruptedException e) {
+            LOG.error(e);
+          }
+        }
+      }
+
+      list = new ArrayList<BSPRunner>();
+      for (int i = 0; i < NUM_PEER; i++) {
+        this.conf.setInt("bsp.peers.num", NUM_PEER);
+        this.conf.set(Constants.PEER_HOST, "localhost");
+        this.conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
+        this.conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
+        this.conf.setInt("NUM_PEER", NUM_PEER);
+        BSPRunner runner = (BSPRunner) ReflectionUtils.newInstance(
+            BSPRunner.class, this.conf);
+
+        list.add(runner);
+      }
+
+      for (int i = 0; i < NUM_PEER; i++) {
+        list.get(i).start();
+      }
+
+      for (int i = 0; i < NUM_PEER; i++) {
+        try {
+          list.get(i).join();
+        } catch (InterruptedException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+      }
+
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      // TODO Auto-generated method stub
+
+    }
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java?rev=958349&r1=958348&r2=958349&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java Sun Jun 27 08:16:20 2010
@@ -102,6 +102,9 @@ public interface JobSubmissionProtocol e
    * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
    * it is just killed, w/o affecting job failure status.  
    */ 
-  public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException;  
+  public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException;
+
+
+  JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException;
   
 }

Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java?rev=958349&r1=958348&r2=958349&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java Sun Jun 27 08:16:20 2010
@@ -155,7 +155,7 @@ public class BSPPeerTest extends HamaClu
 
     BSPPeerThread thread;
     for (int i = 0; i < NUM_PEER; i++) {
-      conf.set("bsp.peers.num", String.valueOf(NUM_PEER));
+      conf.setInt("bsp.peers.num", NUM_PEER);
       conf.set(Constants.PEER_HOST, "localhost");
       conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
       conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");

Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java?rev=958349&r1=958348&r2=958349&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java Sun Jun 27 08:16:20 2010
@@ -60,7 +60,7 @@ public class SerializePrinting extends H
     BSPPeerThread thread;
     int[] randomSequence = new int[] { 2, 3, 4, 5, 0, 1, 6, 7, 8, 9 };
     for (int i = 0; i < NUM_PEER; i++) {
-      conf.set("bsp.peers.num", String.valueOf(NUM_PEER));
+      conf.setInt("bsp.peers.num", NUM_PEER);
       conf.set(Constants.PEER_HOST, "localhost");
       conf.set(Constants.PEER_PORT, String
           .valueOf(30000 + randomSequence[i]));