You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/07/26 04:14:07 UTC
svn commit: r979137 -
/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
Author: edwardyoon
Date: Mon Jul 26 02:14:07 2010
New Revision: 979137
URL: http://svn.apache.org/viewvc?rev=979137&view=rev
Log:
Add trivial comments.
Modified:
incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=979137&r1=979136&r2=979137&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Mon Jul 26 02:14:07 2010
@@ -12,13 +12,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hama.Constants;
import org.apache.hama.ipc.InterTrackerProtocol;
import org.apache.hama.ipc.JobSubmissionProtocol;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
public class LocalJobRunner implements JobSubmissionProtocol {
private static final Log LOG = LogFactory.getLog(BSPJobClient.class);
@@ -123,27 +118,6 @@ public class LocalJobRunner implements J
LOG.info("Number of BSP tasks: " + NUM_PEER);
jobs.put(jobID.toString(), this);
- ZooKeeper zk = new ZooKeeper(Constants.DEFAULT_ZOOKEEPER_SERVER_ADDR,
- 3000, this);
- Stat s = null;
- if (zk != null) {
- try {
- s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false);
- } catch (Exception e) {
- LOG.error(s);
- }
-
- if (s == null) {
- try {
- zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- } catch (KeeperException e) {
- LOG.error(e);
- } catch (InterruptedException e) {
- LOG.error(e);
- }
- }
- }
this.start();
}
@@ -151,10 +125,13 @@ public class LocalJobRunner implements J
while (!threadDone) {
TaskID tID;
for (int i = 0; i < NUM_PEER; i++) {
+ // TODO: this code should be automatically settled by BSP system
this.conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
this.conf.setInt(Constants.PEER_ID, i);
+ // Task ID is an integer that ranges from 0 to NUM_PEER - 1.
tID = new TaskID(job.getJobID(), false, i);
+ // bspRunner should be Runnable.
Task bspRunner = new BSPTask(job.getJobID().getJtIdentifier(), jobFile, tID.toString(), i, this.conf);
LOG.info("Adding task '" + tID.toString() + "' for '" + bspRunner.getName() + "'");
tasks.put(tID.toString(), bspRunner);
@@ -165,7 +142,7 @@ public class LocalJobRunner implements J
e.getValue().runner.start();
}
- // Barrier
+ // Slave Join
for (Map.Entry<String, Task> e : tasks.entrySet()) {
try {
e.getValue().join();