You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/07/01 07:25:55 UTC
svn commit: r959515 - in /incubator/hama/trunk: ./
src/examples/org/apache/hama/examples/ src/java/org/apache/hama/
src/java/org/apache/hama/bsp/ src/java/org/apache/hama/ipc/
src/java/org/apache/hama/util/
Author: edwardyoon
Date: Thu Jul 1 05:25:54 2010
New Revision: 959515
URL: http://svn.apache.org/viewvc?rev=959515&view=rev
Log:
Add "serialize printing" to examples
Added:
incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
incubator/hama/trunk/src/java/org/apache/hama/Constants.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/KillJobAction.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskScheduler.java
incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java
incubator/hama/trunk/src/java/org/apache/hama/util/ClusterUtil.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Jul 1 05:25:54 2010
@@ -4,6 +4,7 @@ Trunk (unreleased changes)
NEW FEATURES
+ HAMA-279: Add "serialize printing" to examples (edwardyoon)
HAMA-272: Hama/Zookeeper Integration (edwardyoon)
HAMA-265: Add example Pi estimatior based on BSP (edwardyoon)
HAMA-266: Add Ant target to generate the UML class diagrams (edwardyoon)
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Thu Jul 1 05:25:54 2010
@@ -29,7 +29,6 @@ import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.BSPMessage;
import org.apache.hama.bsp.BSPPeer;
import org.apache.zookeeper.KeeperException;
-import org.mortbay.log.Log;
public class PiEstimator {
@@ -54,8 +53,6 @@ public class PiEstimator {
byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
BSPMessage estimate = new BSPMessage(tagName, myData);
- Log.info("Send a message (" + Bytes.toDouble(myData) + ") from "
- + bspPeer.getServerName() + " to localhost:30000");
bspPeer.send(new InetSocketAddress("localhost", 30000), estimate);
bspPeer.sync();
@@ -88,7 +85,7 @@ public class PiEstimator {
HamaConfiguration conf = new HamaConfiguration();
// Execute locally
conf.set("bsp.master.address", "local");
-
+
BSPJob bsp = new BSPJob(conf, PiEstimator.class);
// Set the job name
bsp.setJobName("pi estimation example");
Added: incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java?rev=959515&view=auto
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java (added)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java Thu Jul 1 05:25:54 2010
@@ -0,0 +1,62 @@
+package org.apache.hama.examples;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobClient;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.zookeeper.KeeperException;
+
+public class SerializePrinting {
+
+ public static class HelloBSP extends BSP {
+ private Configuration conf;
+
+ @Override
+ public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
+ InterruptedException {
+ int num = Integer.parseInt(conf.get("bsp.peers.num"));
+
+ for (int i = 0; i < num; i++) {
+ if (bspPeer.getId() == i) {
+ System.out.println("Hello BSP from " + i + " of " + num + ": "
+ + bspPeer.getServerName());
+ }
+
+ Thread.sleep(100);
+ bspPeer.sync();
+ }
+
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ }
+
+ public static void main(String[] args) throws InterruptedException,
+ IOException {
+ // BSP job configuration
+ HamaConfiguration conf = new HamaConfiguration();
+ // Execute locally
+ conf.set("bsp.master.address", "local");
+
+ BSPJob bsp = new BSPJob(conf, SerializePrinting.class);
+ // Set the job name
+ bsp.setJobName("serialize printing");
+ bsp.setBspClass(HelloBSP.class);
+
+ bsp.setNumBspTask(5);
+ BSPJobClient.runJob(bsp);
+ }
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Thu Jul 1 05:25:54 2010
@@ -139,4 +139,5 @@ public interface Constants {
public static final String BLOCK = "block";
public static final Text ROWCOUNT= new Text("row");
+ public static final String PEER_ID = "bsp.peer.id";
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Thu Jul 1 05:25:54 2010
@@ -19,7 +19,6 @@ package org.apache.hama.bsp;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.net.InetSocketAddress;
import javax.security.auth.login.LoginException;
@@ -35,7 +34,6 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hama.bsp.BSPMaster;
import org.apache.hama.ipc.JobSubmissionProtocol;
public class BSPJobClient extends Configured {
@@ -183,17 +181,12 @@ public class BSPJobClient extends Config
if ("local".equals(master)) {
this.jobSubmitClient = new LocalJobRunner(conf);
} else {
- this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf);
+ this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
+ JobSubmissionProtocol.versionID, BSPMaster.getAddress(conf), conf, NetUtils.getSocketFactory(
+ conf, JobSubmissionProtocol.class));
}
}
- private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
- Configuration conf) throws IOException {
- return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
- JobSubmissionProtocol.versionID, addr, conf, NetUtils.getSocketFactory(
- conf, JobSubmissionProtocol.class));
- }
-
/**
* Close the <code>JobClient</code>.
*/
@@ -349,7 +342,7 @@ public class BSPJobClient extends Config
job.set("group.name", ugi.getGroupNames()[0]);
}
if (job.getWorkingDirectory() == null) {
- job.setWorkingDirectory(fs.getWorkingDirectory());
+ job.setWorkingDirectory(fs.getWorkingDirectory());
}
// Write job file to BSPMaster's fs
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Thu Jul 1 05:25:54 2010
@@ -25,9 +25,14 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,7 +54,7 @@ import org.apache.hama.ipc.JobSubmission
* BSPMaster is responsible to control all the groom servers and to manage bsp
* jobs.
*/
-public class BSPMaster extends Thread implements JobSubmissionProtocol, InterTrackerProtocol,
+public class BSPMaster implements JobSubmissionProtocol, InterTrackerProtocol,
GroomServerManager {
static {
@@ -67,14 +72,15 @@ public class BSPMaster extends Thread im
}
private static final int FS_ACCESS_RETRY_PERIOD = 10000;
-
+ public static final long GROOMSERVER_EXPIRY_INTERVAL = 10 * 60 * 1000;
+
// States
State state = State.INITIALIZING;
// Attributes
String masterIdentifier;
private Server interTrackerServer;
-
+
// Filesystem
static final String SUBDIR = "bspMaster";
FileSystem fs = null;
@@ -100,10 +106,81 @@ public class BSPMaster extends Thread im
private Map<BSPJobID, JobInProgress> jobs = new TreeMap<BSPJobID, JobInProgress>();
private TaskScheduler taskScheduler;
- /*
- * private final List<JobInProgressListener> jobInProgressListeners = new
- * CopyOnWriteArrayList<JobInProgressListener>();
- */
+ ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
+ Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks,
+ "expireLaunchingTasks");
+
+ private class ExpireLaunchingTasks implements Runnable {
+ private volatile boolean shouldRun = true;
+ private Map<String, Long> launchingTasks = new LinkedHashMap<String, Long>();
+
+ @Override
+ public void run() {
+ while (shouldRun) {
+ long now = System.currentTimeMillis();
+
+ synchronized (BSPMaster.this) {
+ synchronized (launchingTasks) {
+ Iterator<Entry<String, Long>> itr = launchingTasks
+ .entrySet().iterator();
+ while (itr.hasNext()) {
+ Map.Entry<String, Long> pair = itr.next();
+ String taskId = pair.getKey();
+ long age = now - ((Long) pair.getValue()).longValue();
+ LOG.debug(taskId + " is " + age + " ms debug.");
+
+ LOG.info(taskId);
+ if (age > GROOMSERVER_EXPIRY_INTERVAL) {
+ LOG.info("Launching task " + taskId + " timed out.");
+ TaskInProgress tip = null;
+ tip = (TaskInProgress) taskidToTIPMap.get(taskId);
+ if (tip != null) {
+ JobInProgress job = tip.getJob();
+ String groomName = getAssignedTracker(taskId);
+ GroomServerStatus trackerStatus =
+ getGroomServer(groomName);
+ // This might happen when the tasktracker has already
+ // expired and this thread tries to call failedtask
+ // again. expire tasktracker should have called failed
+ // task!
+ if (trackerStatus != null) {
+ /*
+ job.failedTask(tip, taskId, "Error launching task",
+ tip.isMapTask()? TaskStatus.Phase.MAP:
+ TaskStatus.Phase.STARTING,
+ trackerStatus.getHost(), trackerName,
+ myMetrics);
+ */
+ }
+ }
+ itr.remove();
+ } else {
+ // the tasks are sorted by start time, so once we find
+ // one that we want to keep, we are done for this cycle.
+ break;
+ }
+
+ }
+ }
+ }
+ }
+ }
+
+ private String getAssignedTracker(String taskId) {
+ return taskidToTrackerMap.get(taskId);
+ }
+
+ public void addNewTask(String string) {
+ synchronized (launchingTasks) {
+ launchingTasks.put(string, new Long(System.currentTimeMillis()));
+ }
+ }
+
+ public void stop() {
+ shouldRun = false;
+ }
+
+ }
/**
* Start the BSPMaster process, listen on the indicated hostname/port
@@ -116,6 +193,7 @@ public class BSPMaster extends Thread im
InterruptedException {
this.conf = conf;
this.masterIdentifier = identifier;
+ //expireLaunchingTaskThread.start();
// Create the scheduler
Class<? extends TaskScheduler> schedulerClass = conf.getClass(
@@ -228,8 +306,8 @@ public class BSPMaster extends Thread im
return conf.getLocalPath("bsp.local.dir", pathString);
}
- public BSPMaster startMaster() throws IOException,
- InterruptedException {
+ public static BSPMaster startMaster(HamaConfiguration conf)
+ throws IOException, InterruptedException {
return startTracker(conf, generateNewIdentifier());
}
@@ -238,6 +316,7 @@ public class BSPMaster extends Thread im
BSPMaster result = null;
result = new BSPMaster(conf, identifier);
+ result.taskScheduler.setGroomServerManager(result);
return result;
}
@@ -279,37 +358,6 @@ public class BSPMaster extends Thread im
LOG.info("Stopped interTrackerServer");
}
- public static void main(String[] args) {
- StringUtils.startupShutdownMessage(BSPMaster.class, args, LOG);
- if (args.length != 0) {
- System.out.println("usage: HamaMaster");
- System.exit(-1);
- }
-
- try {
- HamaConfiguration conf = new HamaConfiguration();
- conf.set("bsp.local.dir", conf.get("hama.tmp.dir") + "/bsp/local");
-
- BSPMaster master = BSPMaster.constructMaster(BSPMaster.class, conf);
- master.start();
- } catch (Throwable e) {
- LOG.fatal(StringUtils.stringifyException(e));
- System.exit(-1);
- }
- }
-
- public void run() {
- try {
- offerService();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
// //////////////////////////////////////////////////
// GroomServerManager
// //////////////////////////////////////////////////
@@ -330,8 +378,7 @@ public class BSPMaster extends Thread im
@Override
public JobInProgress getJob(BSPJobID jobid) {
- // TODO Auto-generated method stub
- return null;
+ return jobs.get(jobid);
}
@Override
@@ -343,13 +390,12 @@ public class BSPMaster extends Thread im
@Override
public int getNumberOfUniqueHosts() {
// TODO Auto-generated method stub
- return 0;
+ return 1;
}
@Override
public Collection<GroomServerStatus> grooms() {
- // TODO Auto-generated method stub
- return null;
+ return groomServers.values();
}
@Override
@@ -387,11 +433,6 @@ public class BSPMaster extends Thread im
public HeartbeatResponse heartbeat(GroomServerStatus status,
boolean restarted, boolean initialContact, boolean acceptNewTasks,
short responseId) throws IOException {
- LOG.debug(">>> Received the heartbeat message from ");
- LOG.debug(">>> " + status.groomName + "(" + status.getHost() + ")");
- LOG.debug(">>> restarted:" + restarted + ",first:" + initialContact);
- LOG.debug(">>> maxTaskCapacity:" + status.getMaxTasks() + ",taskCapacity:"
- + status.getTaskReports().size());
// First check if the last heartbeat response got through
String groomName = status.getGroomName();
@@ -412,7 +453,7 @@ public class BSPMaster extends Thread im
}
HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
- // List<GroomServerAction> actions = new ArrayList<GroomServerAction>();
+ List<GroomServerAction> actions = new ArrayList<GroomServerAction>();
// Check for new tasks to be executed on the groom server
if (acceptNewTasks) {
@@ -420,19 +461,102 @@ public class BSPMaster extends Thread im
if (groomStatus == null) {
LOG.warn("Unknown task tracker polling; ignoring: " + groomName);
} else {
- // TODO - assignTasks should be implemented
- /*
- * List<Task> tasks = taskScheduler.assignTasks(groomStatus); for(Task
- * task : tasks) { if(tasks != null) { LOG.debug(groomName +
- * "-> LaunchTask: " + task.getTaskID()); actions.add(new
- * LaunchTaskAction(task)); } }
- */
+ LOG.info(groomStatus);
+ List<Task> tasks = taskScheduler.assignTasks(groomStatus);
+ for (Task task : tasks) {
+ if (tasks != null) {
+ expireLaunchingTasks.addNewTask(task.getTaskID());
+ actions.add(new LaunchTaskAction(task));
+ }
+ }
}
}
+ // Check for tasks to be killed
+ List<GroomServerAction> killTasksList = getTasksToKill(groomName);
+ if (killTasksList != null) {
+ actions.addAll(killTasksList);
+ }
+
+ response.setActions(actions.toArray(new GroomServerAction[actions.size()]));
+
+ groomToHeartbeatResponseMap.put(groomName, response);
+ removeMarkedTasks(groomName);
+
return response;
}
+
+ // (trackerID -> TreeSet of completed taskids running at that tracker)
+ TreeMap<String, Set<String>> trackerToMarkedTasksMap = new TreeMap<String, Set<String>>();
+
+ private void removeMarkedTasks(String groomName) {
+ // Purge all the 'marked' tasks which were running at taskTracker
+ TreeSet<String> markedTaskSet =
+ (TreeSet<String>) trackerToMarkedTasksMap.get(groomName);
+ if (markedTaskSet != null) {
+ for (String taskid : markedTaskSet) {
+ removeTaskEntry(taskid);
+ LOG.info("Removed completed task '" + taskid + "' from '" +
+ groomName + "'");
+ }
+ // Clear
+ trackerToMarkedTasksMap.remove(groomName);
+ }
+ }
+
+ private void removeTaskEntry(String taskid) {
+ // taskid --> tracker
+ String tracker = taskidToTrackerMap.remove(taskid);
+
+ // tracker --> taskid
+ if (tracker != null) {
+ TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker);
+ if (trackerSet != null) {
+ trackerSet.remove(taskid);
+ }
+ }
+
+ // taskid --> TIP
+ taskidToTIPMap.remove(taskid);
+
+ LOG.debug("Removing task '" + taskid + "'");
+ }
+
+ private List<GroomServerAction> getTasksToKill(String groomName) {
+ Set<String> taskIds = (TreeSet<String>) trackerToTaskMap.get(groomName);
+ if (taskIds != null) {
+ List<GroomServerAction> killList = new ArrayList<GroomServerAction>();
+ Set<String> killJobIds = new TreeSet<String>();
+ for (String killTaskId : taskIds ) {
+ TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(killTaskId);
+ if (tip.shouldCloseForClosedJob(killTaskId)) {
+ //
+ // This is how the JobTracker ends a task at the TaskTracker.
+ // It may be successfully completed, or may be killed in
+ // mid-execution.
+ //
+ if (tip.getJob().getStatus().getRunState() == JobStatus.RUNNING) {
+ killList.add(new KillTaskAction(killTaskId));
+ LOG.debug(groomName + " -> KillTaskAction: " + killTaskId);
+ } else {
+ String killJobId = tip.getJob().getStatus().getJobID().getJtIdentifier();
+ killJobIds.add(killJobId);
+ }
+ }
+ }
+
+ for (String killJobId : killJobIds) {
+ killList.add(new KillJobAction(killJobId));
+ LOG.debug(groomName + " -> KillJobAction: " + killJobId);
+ }
+
+ return killList;
+ }
+ return null;
+
+ }
+
/**
* Process incoming heartbeat messages from the groom.
*/
@@ -463,17 +587,15 @@ public class BSPMaster extends Thread im
}
@Override
- public JobStatus submitJob(BSPJobID jobId) throws IOException {
- LOG.info("Submitted a job (" + jobId + ")");
- if (jobs.containsKey(jobId)) {
+ public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
+ if (jobs.containsKey(jobID)) {
// job already running, don't start twice
- LOG.info("The job (" + jobId + ") is already subbmitted");
- return jobs.get(jobId).getStatus();
+ LOG.info("The job (" + jobID + ") is already subbmitted");
+ return jobs.get(jobID).getStatus();
}
- JobInProgress job = new JobInProgress(jobId, this, this.conf);
-
- return addJob(jobId, job);
+ JobInProgress job = new JobInProgress(jobID, this, this.conf);
+ return addJob(jobID, job);
}
@Override
@@ -502,7 +624,6 @@ public class BSPMaster extends Thread im
jobs.put(job.getProfile().getJobID(), job);
taskScheduler.addJob(job);
}
-
return job.getStatus();
}
@@ -589,9 +710,56 @@ public class BSPMaster extends Thread im
this.interTrackerServer.stop();
}
- @Override
- public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
- // TODO Auto-generated method stub
- return null;
+ TreeMap<String, String> taskidToTrackerMap = new TreeMap<String, String>();
+ TreeMap<String, TreeSet<String>> trackerToTaskMap = new TreeMap<String, TreeSet<String>>();
+ Map<String, TaskInProgress> taskidToTIPMap = new TreeMap<String, TaskInProgress>();
+
+ public void createTaskEntry(String taskid, String groomServer,
+ TaskInProgress taskInProgress) {
+ LOG.info("Adding task '" + taskid + "' to tip " + taskInProgress.getTIPId() + ", for tracker '" + groomServer + "'");
+ /*
+ // taskid --> groom
+ taskidToTrackerMap.put(taskid, groomServer);
+ // groom --> taskid
+ TreeSet<String> taskset = null;
+ if(trackerToTaskMap.entrySet().size() > 0) {
+ taskset = trackerToTaskMap.get(groomServer);
+ LOG.info(taskset.size());
+ LOG.info(taskset.size());
+ LOG.info(taskset.size());
+ }
+
+ if (taskset == null) {
+ taskset = new TreeSet<String>();
+ trackerToTaskMap.put(groomServer, taskset);
+ }
+ taskset.add(taskid);
+ taskidToTIPMap.put(taskid, taskInProgress);
+
+ LOG.info("" + taskidToTrackerMap);
+ LOG.info("" + taskidToTIPMap);
+ */
+ }
+
+ public static void main(String[] args) {
+ StringUtils.startupShutdownMessage(BSPMaster.class, args, LOG);
+ if (args.length != 0) {
+ System.out.println("usage: HamaMaster");
+ System.exit(-1);
+ }
+
+ try {
+ HamaConfiguration conf = new HamaConfiguration();
+ conf.addResource(new Path(
+ "/home/edward/workspace/hama-trunk/conf/hama-default.xml"));
+ conf.addResource(new Path(
+ "/home/edward/workspace/hama-trunk/conf/hama-site.xml"));
+
+ BSPMaster master = startMaster(conf);
+ master.offerService();
+ } catch (Throwable e) {
+ LOG.fatal(StringUtils.stringifyException(e));
+ System.exit(-1);
+ }
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Thu Jul 1 05:25:54 2010
@@ -58,7 +58,8 @@ public class BSPPeer implements Watcher,
protected final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
protected final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
protected final ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
-
+ protected int id;
+
/**
*
*/
@@ -69,6 +70,7 @@ public class BSPPeer implements Watcher,
+ ":" + conf.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
bindAddress = conf.get(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
bindPort = conf.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+ id = conf.getInt(Constants.PEER_ID, 0);
bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
Constants.DEFAULT_ZOOKEEPER_ROOT);
zookeeperAddr = conf.get(Constants.ZOOKEEPER_SERVER_ADDRS,
@@ -242,4 +244,9 @@ public class BSPPeer implements Watcher,
public String getServerName() {
return this.serverName;
}
+
+ public int getId() {
+ return this.id;
+ }
+
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java Thu Jul 1 05:25:54 2010
@@ -13,7 +13,8 @@ public class BSPRunner extends Thread im
private BSPPeer bspPeer;
private Configuration conf;
private BSP bsp;
-
+ private boolean isDone;
+
public void run() {
try {
bsp.bsp(bspPeer);
@@ -39,4 +40,8 @@ public class BSPRunner extends Thread im
bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class",
BSP.class), conf);
}
+
+ public boolean isDone() {
+ return this.isDone;
+ }
}
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=959515&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Thu Jul 1 05:25:54 2010
@@ -0,0 +1,17 @@
+package org.apache.hama.bsp;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class BSPTask extends Task {
+
+ public BSPTask(String jobId, String jobFile, String taskid, int partition, Configuration conf) {
+ this.jobId = jobId;
+ this.jobFile = jobFile;
+ this.taskId = taskid;
+ this.partition = partition;
+ this.runner = (BSPRunner) ReflectionUtils.newInstance(
+ BSPRunner.class, conf);
+ }
+
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Thu Jul 1 05:25:54 2010
@@ -27,6 +27,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -86,6 +88,9 @@ public class GroomServer implements Runn
Map<TaskAttemptID, TaskInProgress> runningTasks = null;
Map<BSPJobID, RunningJob> runningJobs = null;
+ private BlockingQueue<GroomServerAction> tasksToCleanup =
+ new LinkedBlockingQueue<GroomServerAction>();
+
public GroomServer(Configuration conf) throws IOException {
this.conf = conf;
bspMasterAddr = BSPMaster.getAddress(conf);
@@ -203,6 +208,23 @@ public class GroomServer implements Runn
// Send the heartbeat and process the bspmaster's directives
HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
+
+ GroomServerAction[] actions = heartbeatResponse.getActions();
+ LOG.info("Got heartbeatResponse from BSPMaster with responseId: " +
+ heartbeatResponse.getResponseId() + " and " +
+ ((actions != null) ? actions.length : 0) + " actions");
+
+
+ if (actions != null){
+ for(GroomServerAction action: actions) {
+ if (action instanceof LaunchTaskAction) {
+ startNewTask((LaunchTaskAction) action);
+ } else {
+ tasksToCleanup.put(action);
+ }
+ }
+ }
+
//
// The heartbeat got through successfully!
//
@@ -236,8 +258,17 @@ public class GroomServer implements Runn
return State.NORMAL;
}
- private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
+ private void startNewTask(LaunchTaskAction action) {
+ // TODO Auto-generated method stub
+ Task t = action.getTask();
+ LOG.info("GroomServer: " + t.getJobID() + ", " + t.getJobFile() + ", " + t.getId() + ", " + t.getPartition());
+ LOG.info(t.runner);
+ //t.runner.start();
+ // TODO: execute task
+ }
+
+ private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
//
// Check if the last heartbeat got through...
// if so then build the heartbeat information for the BSPMaster;
@@ -273,7 +304,9 @@ public class GroomServer implements Runn
initialize();
startCleanupThreads();
boolean denied = false;
+ LOG.info("Why? " + running + ", " + shuttingDown + ", " + denied);
while (running && !shuttingDown && !denied) {
+
boolean staleState = false;
try {
while (running && !staleState && !shuttingDown && !denied) {
@@ -326,27 +359,6 @@ public class GroomServer implements Runn
RPC.stopProxy(jobClient);
}
- public static void main(String[] args) {
- StringUtils.startupShutdownMessage(GroomServer.class, args, LOG);
- if (args.length != 0) {
- System.out.println("usage: GroomServer");
- System.exit(-1);
- }
-
- try {
- Configuration conf = new HamaConfiguration();
- conf.set("bsp.master.port", "40000");
- conf.set("bsp.groom.port", "40020");
- conf.set("bsp.local.dir", conf.get("hadoop.tmp.dir") + "/bsp/local");
- conf.set("bsp.system.dir", conf.get("hadoop.tmp.dir") + "/bsp/system");
- GroomServer groom = GroomServer.constructGroomServer(GroomServer.class, conf);
- startGroomServer(groom);
- } catch (Throwable e) {
- LOG.fatal(StringUtils.stringifyException(e));
- System.exit(-1);
- }
- }
-
public static Thread startGroomServer(final GroomServer hrs) {
return startGroomServer(hrs,
"regionserver" + hrs.groomServerName);
@@ -425,4 +437,25 @@ public class GroomServer implements Runn
"Master: " + groomServerClass.toString(), e);
}
}
+
+ public static void main(String[] args) {
+ StringUtils.startupShutdownMessage(GroomServer.class, args, LOG);
+ if (args.length != 0) {
+ System.out.println("usage: GroomServer");
+ System.exit(-1);
+ }
+
+ try {
+ Configuration conf = new HamaConfiguration();
+ conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-default.xml"));
+ conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-site.xml"));
+
+ GroomServer groom = GroomServer.constructGroomServer(GroomServer.class, conf);
+ startGroomServer(groom);
+ } catch (Throwable e) {
+ LOG.fatal(StringUtils.stringifyException(e));
+ System.exit(-1);
+ }
+ }
+
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java Thu Jul 1 05:25:54 2010
@@ -144,7 +144,7 @@ public class GroomServerStatus implement
Text.writeString(out, host);
out.writeInt(failures);
out.writeInt(maxTasks);
- out.writeInt(taskReports.size());
+ out.writeInt(taskReports.size());
for(TaskStatus taskStatus : taskReports) {
taskStatus.write(out);
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java Thu Jul 1 05:25:54 2010
@@ -45,6 +45,7 @@ class JobInProgress {
static final Log LOG = LogFactory.getLog(JobInProgress.class);
+ Configuration conf;
JobProfile profile;
JobStatus status;
Path jobFile = null;
@@ -62,6 +63,7 @@ class JobInProgress {
public JobInProgress(BSPJobID jobId, BSPMaster master, Configuration conf)
throws IOException {
+ this.conf = conf;
this.jobId = jobId;
this.master = master;
@@ -80,10 +82,10 @@ class JobInProgress {
fs.copyToLocalFile(jobFile, localJobFile);
BSPJobContext job = new BSPJobContext(localJobFile, jobId);
- System.out.println("user:" + job.getUser());
- System.out.println("jobId:" + jobId);
- System.out.println("jobFile:" + jobFile.toString());
- System.out.println("jobName:" + job.getJobName());
+ LOG.info("user:" + job.getUser());
+ LOG.info("jobId:" + jobId);
+ LOG.info("jobFile:" + jobFile.toString());
+ LOG.info("jobName:" + job.getJobName());
this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job
.getJobName());
@@ -135,7 +137,16 @@ class JobInProgress {
// ///////////////////////////////////////////////////
public synchronized Task obtainNewTask(GroomServerStatus status,
int clusterSize, int numUniqueHosts) {
+ Task result = null;
+ try {
+ result = new TaskInProgress(getJobID(), this.jobFile.toString(), this.master, null, this,
+ numUniqueHosts).getTaskToRun(status);
+ LOG.info("JobInProgress: " + result.getJobID() + ", " + result.getJobFile() + ", " + result.getId() + ", " + result.getPartition());
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
- return null;
+ return result;
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/KillJobAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/KillJobAction.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/KillJobAction.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/KillJobAction.java Thu Jul 1 05:25:54 2010
@@ -21,6 +21,8 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.hadoop.io.Text;
+
/**
* Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
* {@link org.apache.hama.bsp.GroomServer} to kill the task of a job and cleanup
@@ -28,30 +30,30 @@ import java.io.IOException;
*
*/
class KillJobAction extends GroomServerAction {
- final BSPJobID jobId;
+ String jobId;
public KillJobAction() {
super(ActionType.KILL_JOB);
- jobId = new BSPJobID();
+ jobId = new String();
}
- public KillJobAction(BSPJobID jobId) {
+ public KillJobAction(String killJobId) {
super(ActionType.KILL_JOB);
- this.jobId = jobId;
+ this.jobId = killJobId;
}
- public BSPJobID getJobID() {
+ public String getJobID() {
return jobId;
}
@Override
public void write(DataOutput out) throws IOException {
- jobId.write(out);
+ Text.writeString(out, jobId);
}
@Override
public void readFields(DataInput in) throws IOException {
- jobId.readFields(in);
+ jobId = Text.readString(in);
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java Thu Jul 1 05:25:54 2010
@@ -21,6 +21,8 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.hadoop.io.Text;
+
/**
* Represents a directive from the {@link org.apache.hama.bsp.BSPMaster}
@@ -28,29 +30,29 @@ import java.io.IOException;
*
*/
class KillTaskAction extends GroomServerAction {
- final TaskAttemptID taskId;
+ String taskId;
public KillTaskAction() {
super(ActionType.KILL_TASK);
- taskId = new TaskAttemptID();
+ taskId = new String();
}
- public KillTaskAction(TaskAttemptID taskId) {
+ public KillTaskAction(String killTaskId) {
super(ActionType.KILL_TASK);
- this.taskId = taskId;
+ this.taskId = killTaskId;
}
- public TaskAttemptID getTaskID() {
+ public String getTaskID() {
return taskId;
}
@Override
public void write(DataOutput out) throws IOException {
- taskId.write(out);
+ Text.writeString(out, taskId);
}
@Override
public void readFields(DataInput in) throws IOException {
- taskId.readFields(in);
+ taskId = Text.readString(in);
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Thu Jul 1 05:25:54 2010
@@ -9,7 +9,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.ipc.InterTrackerProtocol;
import org.apache.hama.ipc.JobSubmissionProtocol;
@@ -102,12 +101,6 @@ public class LocalJobRunner implements J
}
}
- @Override
- public JobStatus submitJob(BSPJobID jobName) throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
/**
* Local Job
*/
@@ -116,16 +109,17 @@ public class LocalJobRunner implements J
private Configuration conf;
private int NUM_PEER;
private BSPJob job;
+ private String jobFile;
private boolean threadDone = false;
- private HashMap<String, BSPRunner> tasks = new HashMap<String, BSPRunner>();
+ private HashMap<String, Task> tasks = new HashMap<String, Task>();
public Job(BSPJobID jobID, String jobFile, Configuration conf)
throws IOException {
this.conf = conf;
+ this.jobFile = jobFile;
this.NUM_PEER = conf.getInt("bsp.peers.num", 0);
LOG.info("LocalJobRunner: " + jobID + ", " + jobFile);
this.job = new BSPJob(jobID, jobFile);
- LOG.info("Jar file: " + job.getJar());
LOG.info("Number of BSP tasks: " + NUM_PEER);
jobs.put(jobID.toString(), this);
@@ -158,17 +152,21 @@ public class LocalJobRunner implements J
TaskID tID;
for (int i = 0; i < NUM_PEER; i++) {
this.conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
- BSPRunner runner = (BSPRunner) ReflectionUtils.newInstance(
- BSPRunner.class, this.conf);
+ this.conf.setInt(Constants.PEER_ID, i);
tID = new TaskID(job.getJobID(), false, i);
- tasks.put(tID.toString(), runner);
+
+ Task bspRunner = new BSPTask(job.getJobID().getJtIdentifier(), jobFile, tID.toString(), i, this.conf);
+ LOG.info("Adding task '" + tID.toString() + "' for '" + bspRunner.getName() + "'");
+ tasks.put(tID.toString(), bspRunner);
}
- for (Map.Entry<String, BSPRunner> e : tasks.entrySet()) {
- e.getValue().start();
+ // Launching tasks
+ for (Map.Entry<String, Task> e : tasks.entrySet()) {
+ e.getValue().runner.start();
}
- for (Map.Entry<String, BSPRunner> e : tasks.entrySet()) {
+ // Barrier
+ for (Map.Entry<String, Task> e : tasks.entrySet()) {
try {
e.getValue().join();
} catch (InterruptedException e1) {
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java Thu Jul 1 05:25:54 2010
@@ -54,13 +54,11 @@ class SimpleTaskScheduler extends TaskSc
public List<Task> assignTasks(GroomServerStatus groomStatus)
throws IOException {
ClusterStatus clusterStatus = groomServerManager.getClusterStatus(false);
-
+
final int numGroomServers = clusterStatus.getGroomServers();
// final int clusterTaskCapacity = clusterStatus.getMaxTasks();
- //
// Get task counts for the current groom.
- //
// final int groomTaskCapacity = groom.getMaxTasks();
final int groomRunningTasks = groomStatus.countTasks();
@@ -73,14 +71,19 @@ class SimpleTaskScheduler extends TaskSc
// instance to the scheduler.
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
+ /*
if (job.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
+ */
Task t = null;
t = job.obtainNewTask(groomStatus, numGroomServers,
groomServerManager.getNumberOfUniqueHosts());
+
+ LOG.info("SimpleTaskScheduler: " + t.getJobID() + ", " + t.getJobFile() + ", " + t.getId() + ", " + t.getPartition());
+
if (t != null) {
assignedTasks.add(t);
break; // TODO - Now, simple scheduler assigns only one task to
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Thu Jul 1 05:25:54 2010
@@ -21,6 +21,8 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -28,23 +30,28 @@ import org.apache.hadoop.io.Writable;
/**
*
*/
-public class Task implements Writable {
+public class Task extends Thread implements Writable {
+ public static final Log LOG = LogFactory.getLog(Task.class);
////////////////////////////////////////////
// Fields
////////////////////////////////////////////
- private String jobFile;
- private TaskAttemptID taskId;
- private int partition;
+ protected String jobId;
+ protected String jobFile;
+ protected String taskId;
+ protected int partition;
+
+ protected BSPRunner runner;
protected LocalDirAllocator lDirAlloc;
/**
*
*/
public Task() {
- taskId = new TaskAttemptID();
+ taskId = new String();
}
- public Task(String jobFile, TaskAttemptID taskId, int partition) {
+ public Task(String jobId, String jobFile, String taskId, int partition) {
+ this.jobId = jobId;
this.jobFile = jobFile;
this.taskId = taskId;
@@ -62,7 +69,7 @@ public class Task implements Writable {
return jobFile;
}
- public TaskAttemptID getTaskID() {
+ public String getTaskID() {
return taskId;
}
@@ -70,8 +77,8 @@ public class Task implements Writable {
* Get the job name for this task.
* @return the job name
*/
- public BSPJobID getJobID() {
- return taskId.getJobID();
+ public String getJobID() {
+ return jobId;
}
/**
@@ -92,15 +99,18 @@ public class Task implements Writable {
////////////////////////////////////////////
@Override
public void write(DataOutput out) throws IOException {
+ Text.writeString(out, jobId);
Text.writeString(out, jobFile);
- taskId.write(out);
+ Text.writeString(out, taskId);
out.writeInt(partition);
}
@Override
public void readFields(DataInput in) throws IOException {
+ jobId = Text.readString(in);
jobFile = Text.readString(in);
- taskId.readFields(in);
+ taskId = Text.readString(in);
partition = in.readInt();
}
+
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Thu Jul 1 05:25:54 2010
@@ -17,10 +17,15 @@
*/
package org.apache.hama.bsp;
+import java.io.IOException;
import java.util.TreeMap;
+import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hama.Constants;
/**
*
@@ -28,7 +33,7 @@ import org.apache.commons.logging.LogFac
class TaskInProgress {
public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
- private BSPJobContext context;
+ private Configuration conf;
// Constants
static final int MAX_TASK_EXECS = 1;
@@ -58,7 +63,7 @@ class TaskInProgress {
// Map from task Id -> GroomServer Id, contains tasks that are
// currently runnings
- private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();
+ private TreeMap<String, String> activeTasks = new TreeMap<String, String>();
// All attempt Ids of this TIP
// private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();
/**
@@ -66,15 +71,45 @@ class TaskInProgress {
*/
private TreeMap<TaskAttemptID, TaskStatus> taskStatuses = new TreeMap<TaskAttemptID, TaskStatus>();
+ private BSPJobID jobId;
+
public TaskInProgress(BSPJobID jobId, String jobFile, BSPMaster master,
- BSPJobContext context, JobInProgress job, int partition) {
+ Configuration conf, JobInProgress job, int partition) {
+ this.jobId = jobId;
this.jobFile = jobFile;
this.bspMaster = master;
this.job = job;
- this.context = context;
+ this.conf = conf;
this.partition = partition;
+
+ this.id = new TaskID(jobId, true, partition);
}
+ /**
+ * Return a Task that can be sent to a GroomServer for execution.
+ */
+ public Task getTaskToRun(GroomServerStatus status) throws IOException {
+ Task t = null;
+
+ String taskid = null;
+ if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts)) {
+ taskid = new String("task_" + nextTaskId);
+ ++nextTaskId;
+ } else {
+ LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts) +
+ " attempts for the tip '" + getTIPId() + "'");
+ return null;
+ }
+
+ //this.conf.set(Constants.PEER_PORT, String.valueOf(30000));
+ t = new BSPTask(jobId.getJtIdentifier(), jobFile, taskid, partition, this.conf);
+ activeTasks.put(taskid, status.getGroomName());
+
+ // Ask JobTracker to note that the task exists
+ bspMaster.createTaskEntry(taskid, status.getGroomName(), this);
+ return t;
+ }
+
// //////////////////////////////////
// Accessors
// //////////////////////////////////
@@ -123,4 +158,18 @@ class TaskInProgress {
public synchronized boolean isComplete() {
return (completes > 0);
}
+
+ private TreeSet tasksReportedClosed = new TreeSet();
+
+ public boolean shouldCloseForClosedJob(String taskid) {
+ TaskStatus ts = (TaskStatus) taskStatuses.get(taskid);
+ if ((ts != null) &&
+ (! tasksReportedClosed.contains(taskid)) &&
+ (job.getStatus().getRunState() != JobStatus.RUNNING)) {
+ tasksReportedClosed.add(taskid);
+ return true;
+ } else {
+ return false;
+ }
+ }
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskScheduler.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskScheduler.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskScheduler.java Thu Jul 1 05:25:54 2010
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.mortbay.log.Log;
/**
* Used by a {@link BSPMaster} to schedule {@link Task}s on {@link GroomServer}
@@ -43,6 +44,7 @@ abstract class TaskScheduler implements
public synchronized void setGroomServerManager(
GroomServerManager groomServerManager) {
+ Log.info("TaskScheduler.setGroomServermanager()");
this.groomServerManager = groomServerManager;
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java Thu Jul 1 05:25:54 2010
@@ -44,11 +44,14 @@ public interface JobSubmissionProtocol e
* that job.
* The job files should be submitted in <b>system-dir</b>/<b>jobName</b>.
*
- * @param jobName
+ * @param jobID
+ * @param jobFile
* @return jobStatus
* @throws IOException
*/
- public JobStatus submitJob(BSPJobID jobName) throws IOException;
+ //public JobStatus submitJob(BSPJobID jobName) throws IOException;
+
+ public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException;
/**
* Get the current status of the cluster
@@ -104,7 +107,5 @@ public interface JobSubmissionProtocol e
*/
public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException;
-
- JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException;
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/util/ClusterUtil.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/util/ClusterUtil.java?rev=959515&r1=959514&r2=959515&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/util/ClusterUtil.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/util/ClusterUtil.java Thu Jul 1 05:25:54 2010
@@ -4,6 +4,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMaster;
import org.apache.hama.bsp.GroomServer;
import org.apache.log4j.Logger;
@@ -76,7 +77,7 @@ public class ClusterUtil {
public static String startup(final BSPMaster m,
final List<ClusterUtil.GroomServerThread> groomservers, Configuration conf) throws IOException, InterruptedException {
if (m != null) {
- m.start();
+ m.startMaster((HamaConfiguration) conf);
}
if (groomservers != null) {