You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/08/02 13:18:20 UTC
svn commit: r981457 - in /incubator/hama/trunk/src:
examples/org/apache/hama/examples/ java/org/apache/hama/bsp/
Author: edwardyoon
Date: Mon Aug 2 11:18:20 2010
New Revision: 981457
URL: http://svn.apache.org/viewvc?rev=981457&view=rev
Log:
I just commit HAMA-276_v04.patch
Modified:
incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=981457&r1=981456&r2=981457&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Mon Aug 2 11:18:20 2010
@@ -20,6 +20,8 @@ package org.apache.hama.examples;
import java.io.IOException;
import java.net.InetSocketAddress;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hama.HamaConfiguration;
@@ -33,6 +35,7 @@ import org.apache.zookeeper.KeeperExcept
public class PiEstimator {
public static class MyEstimator extends BSP {
+ public static final Log LOG = LogFactory.getLog(MyEstimator.class);
private Configuration conf;
private static final int iterations = 10000;
@@ -53,18 +56,22 @@ public class PiEstimator {
byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
BSPMessage estimate = new BSPMessage(tagName, myData);
+ LOG.info("Sent message to localhost:30000: " + Bytes.toDouble(myData));
bspPeer.send(new InetSocketAddress("localhost", 30000), estimate);
+ LOG.info("Enter the barrier");
bspPeer.sync();
double pi = 0.0;
BSPMessage received;
while ((received = bspPeer.getCurrentMessage()) != null) {
+ LOG.info("Receives messages:" + Bytes.toDouble(received.getData()));
pi = (pi + Bytes.toDouble(received.getData())) / 2;
}
- if (pi != 0.0)
+ if (pi != 0.0) {
+ LOG.info("\nEstimated value of PI is " + pi);
System.out.println("\nEstimated value of PI is " + pi);
-
+ }
}
@Override
@@ -84,14 +91,15 @@ public class PiEstimator {
// BSP job configuration
HamaConfiguration conf = new HamaConfiguration();
// Execute locally
- conf.set("bsp.master.address", "local");
+ //conf.set("bsp.master.address", "local");
+ conf.set("bsp.master.address", "localhost:40000");
BSPJob bsp = new BSPJob(conf, PiEstimator.class);
// Set the job name
bsp.setJobName("pi estimation example");
bsp.setBspClass(MyEstimator.class);
- bsp.setNumBspTask(10);
+ bsp.setNumBspTask(1);
BSPJobClient.runJob(bsp);
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java?rev=981457&r1=981456&r2=981457&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java Mon Aug 2 11:18:20 2010
@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFac
*/
public abstract class BSP extends Thread implements BSPInterface {
private static final Log LOG = LogFactory.getLog(BSP.class);
+ private BSPPeer bspPeer;
/**
* A thread's run method.
@@ -34,9 +35,13 @@ public abstract class BSP extends Thread
*/
public void run() {
try {
- bsp(new BSPPeer(this.getConf()));
+ bsp(bspPeer);
} catch (Exception e) {
LOG.error(e);
}
}
+
+ public void setPeer(BSPPeer bspServer) {
+ this.bspPeer = bspServer;
+ }
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=981457&r1=981456&r2=981457&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Mon Aug 2 11:18:20 2010
@@ -156,7 +156,7 @@ public class BSPPeer implements Watcher,
}
protected boolean enterBarrier() throws KeeperException, InterruptedException {
- LOG.debug("[" + serverName + "] enter the enterbarrier");
+ LOG.info("[" + serverName + "] enter the enterbarrier");
try {
zk.create(bspRoot + "/" + serverName, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
@@ -165,13 +165,17 @@ public class BSPPeer implements Watcher,
} catch (InterruptedException e) {
e.printStackTrace();
}
-
+ LOG.info("enterbarrier done 0");
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(bspRoot, true);
+ LOG.info(list.size() + ", " + conf.getInt("bsp.peers.num", 0));
+
if (list.size() < conf.getInt("bsp.peers.num", 0)) {
+ LOG.info("enterbarrier done 1");
mutex.wait();
} else {
+ LOG.info("enterbarrier done 2");
return true;
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java?rev=981457&r1=981456&r2=981457&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java Mon Aug 2 11:18:20 2010
@@ -1,7 +1,5 @@
package org.apache.hama.bsp;
-import java.io.IOException;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
@@ -10,12 +8,11 @@ import org.apache.hadoop.util.Reflection
public class BSPRunner extends Thread implements Configurable {
private static final Log LOG = LogFactory.getLog(BSPRunner.class);
- private BSPPeer bspPeer;
private Configuration conf;
private BSP bsp;
private boolean isDone;
- public void run() {
+ public void run(BSPPeer bspPeer) {
try {
bsp.bsp(bspPeer);
} catch (Exception e) {
@@ -31,11 +28,6 @@ public class BSPRunner extends Thread im
@Override
public void setConf(Configuration conf) {
this.conf = conf;
- try {
- this.bspPeer = new BSPPeer(conf);
- } catch (IOException e) {
- e.printStackTrace();
- }
bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class",
BSP.class), conf);
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=981457&r1=981456&r2=981457&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Mon Aug 2 11:18:20 2010
@@ -4,14 +4,22 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.util.ReflectionUtils;
public class BSPTask extends Task {
+ private BSP bsp;
+ private Configuration conf;
public BSPTask(String jobId, String jobFile, String taskid, int partition, Configuration conf) {
this.jobId = jobId;
this.jobFile = jobFile;
this.taskId = taskid;
this.partition = partition;
- this.runner = (BSPRunner) ReflectionUtils.newInstance(
- BSPRunner.class, conf);
+ this.conf = conf;
+ }
+
+ public BSP getBSPClass() {
+ bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class",
+ BSP.class), conf);
+
+ return bsp;
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=981457&r1=981456&r2=981457&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Mon Aug 2 11:18:20 2010
@@ -40,13 +40,17 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.ipc.InterTrackerProtocol;
public class GroomServer implements Runnable {
public static final Log LOG = LogFactory.getLog(GroomServer.class);
+ private BSPPeer bspPeer;
+ private Task task;
static {
Configuration.addDefaultResource("hama-default.xml");
@@ -92,9 +96,14 @@ public class GroomServer implements Runn
new LinkedBlockingQueue<GroomServerAction>();
public GroomServer(Configuration conf) throws IOException {
+ LOG.info("groom start");
this.conf = conf;
- bspMasterAddr = BSPMaster.getAddress(conf);
-
+ String mode = conf.get("bsp.master.address");
+ if(!mode.equals("local")) {
+ bspMasterAddr = BSPMaster.getAddress(conf);
+ }
+ bspPeer = new BSPPeer(conf);
+
//FileSystem local = FileSystem.getLocal(conf);
//this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
}
@@ -214,7 +223,6 @@ public class GroomServer implements Runn
heartbeatResponse.getResponseId() + " and " +
((actions != null) ? actions.length : 0) + " actions");
-
if (actions != null){
for(GroomServerAction action: actions) {
if (action instanceof LaunchTaskAction) {
@@ -260,9 +268,10 @@ public class GroomServer implements Runn
private void startNewTask(LaunchTaskAction action) {
// TODO Auto-generated method stub
- Task t = action.getTask();
- LOG.info("GroomServer: " + t.getJobID() + ", " + t.getJobFile() + ", " + t.getId() + ", " + t.getPartition());
- LOG.info(t.runner);
+ task = action.getTask();
+ this.launchTask();
+ //LOG.info("GroomServer: " + t.getJobID() + ", " + t.getJobFile() + ", " + t.getId() + ", " + t.getPartition());
+ //LOG.info(t.runner);
//t.runner.start();
// TODO: execute task
@@ -352,7 +361,7 @@ public class GroomServer implements Runn
public synchronized void close() throws IOException {
this.running = false;
-
+ bspPeer.close();
cleanupStorage();
// shutdown RPC connections
@@ -450,6 +459,7 @@ public class GroomServer implements Runn
conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-default.xml"));
conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-site.xml"));
+ conf.set(Constants.PEER_PORT, String.valueOf(30000));
GroomServer groom = GroomServer.constructGroomServer(GroomServer.class, conf);
startGroomServer(groom);
} catch (Throwable e) {
@@ -457,5 +467,30 @@ public class GroomServer implements Runn
System.exit(-1);
}
}
-
+
+ public void assignTask(Task task) {
+ this.task = task;
+ }
+
+ public void launchTask() {
+ Configuration jobConf = new Configuration();
+ jobConf.addResource(new Path(task.getJobFile().replace("file:", "")));
+ BSP bsp = (BSP) ReflectionUtils.newInstance(jobConf.getClass("bsp.work.class",
+ BSP.class), conf);
+ bsp.setPeer(bspPeer);
+ bsp.start();
+ while(!bsp.isAlive()) {
+ LOG.info("i'm done. ");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public String getServerName() {
+ return bspPeer.getServerName();
+ }
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java?rev=981457&r1=981456&r2=981457&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java Mon Aug 2 11:18:20 2010
@@ -141,7 +141,7 @@ class JobInProgress {
try {
result = new TaskInProgress(getJobID(), this.jobFile.toString(), this.master, null, this,
numUniqueHosts).getTaskToRun(status);
- LOG.info("JobInProgress: " + result.getJobID() + ", " + result.getJobFile() + ", " + result.getId() + ", " + result.getPartition());
+ //LOG.info("JobInProgress: " + result.getJobID() + ", " + result.getJobFile() + ", " + result.getId() + ", " + result.getPartition());
} catch (IOException e) {
e.printStackTrace();
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=981457&r1=981456&r2=981457&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Mon Aug 2 11:18:20 2010
@@ -2,7 +2,6 @@ package org.apache.hama.bsp;
import java.io.IOException;
import java.util.HashMap;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -12,8 +11,13 @@ import org.apache.hadoop.fs.Path;
import org.apache.hama.Constants;
import org.apache.hama.ipc.InterTrackerProtocol;
import org.apache.hama.ipc.JobSubmissionProtocol;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
public class LocalJobRunner implements JobSubmissionProtocol {
private static final Log LOG = LogFactory.getLog(BSPJobClient.class);
@@ -106,7 +110,6 @@ public class LocalJobRunner implements J
private BSPJob job;
private String jobFile;
private boolean threadDone = false;
- private HashMap<String, Task> tasks = new HashMap<String, Task>();
public Job(BSPJobID jobID, String jobFile, Configuration conf)
throws IOException {
@@ -118,6 +121,26 @@ public class LocalJobRunner implements J
LOG.info("Number of BSP tasks: " + NUM_PEER);
jobs.put(jobID.toString(), this);
+ ZooKeeper zk = new ZooKeeper("localhost:21810", 3000, this);
+ Stat s = null;
+ if (zk != null) {
+ try {
+ s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false);
+ } catch (Exception e) {
+ LOG.error(s);
+ }
+
+ if (s == null) {
+ try {
+ zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (KeeperException e) {
+ LOG.error(e);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+ }
this.start();
}
@@ -125,31 +148,23 @@ public class LocalJobRunner implements J
while (!threadDone) {
TaskID tID;
for (int i = 0; i < NUM_PEER; i++) {
- // TODO: this code should be automatically settled by BSP system
this.conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
this.conf.setInt(Constants.PEER_ID, i);
// Task ID is an integer that ranges from 0 to NUM_PEER - 1.
tID = new TaskID(job.getJobID(), false, i);
- // bspRunner should be Runnable.
- Task bspRunner = new BSPTask(job.getJobID().getJtIdentifier(), jobFile, tID.toString(), i, this.conf);
- LOG.info("Adding task '" + tID.toString() + "' for '" + bspRunner.getName() + "'");
- tasks.put(tID.toString(), bspRunner);
- }
-
- // Launching tasks
- for (Map.Entry<String, Task> e : tasks.entrySet()) {
- e.getValue().runner.start();
- }
-
- // Slave Join
- for (Map.Entry<String, Task> e : tasks.entrySet()) {
try {
- e.getValue().join();
- } catch (InterruptedException e1) {
- e1.printStackTrace();
+ GroomServer servers = new GroomServer(conf);
+ Task task = new BSPTask(job.getJobID().getJtIdentifier(), jobFile, tID.toString(), i, this.conf);
+ servers.assignTask(task);
+ LOG.info("Adding task '" + tID.toString() + "' for '" + servers.getServerName() + "'");
+
+ servers.launchTask();
+ } catch (IOException e) {
+ e.printStackTrace();
}
}
+
done();
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=981457&r1=981456&r2=981457&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java Mon Aug 2 11:18:20 2010
@@ -82,7 +82,7 @@ class SimpleTaskScheduler extends TaskSc
t = job.obtainNewTask(groomStatus, numGroomServers,
groomServerManager.getNumberOfUniqueHosts());
- LOG.info("SimpleTaskScheduler: " + t.getJobID() + ", " + t.getJobFile() + ", " + t.getId() + ", " + t.getPartition());
+ //LOG.info("SimpleTaskScheduler: " + t.getJobID() + ", " + t.getJobFile() + ", " + t.getId() + ", " + t.getPartition());
if (t != null) {
assignedTasks.add(t);
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=981457&r1=981456&r2=981457&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Mon Aug 2 11:18:20 2010
@@ -30,7 +30,7 @@ import org.apache.hadoop.io.Writable;
/**
*
*/
-public class Task extends Thread implements Writable {
+public class Task implements Writable {
public static final Log LOG = LogFactory.getLog(Task.class);
////////////////////////////////////////////
// Fields
@@ -41,11 +41,8 @@ public class Task extends Thread impleme
protected String taskId;
protected int partition;
- protected BSPRunner runner;
protected LocalDirAllocator lDirAlloc;
- /**
- *
- */
+
public Task() {
taskId = new String();
}
@@ -54,7 +51,6 @@ public class Task extends Thread impleme
this.jobId = jobId;
this.jobFile = jobFile;
this.taskId = taskId;
-
this.partition = partition;
}