You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/12/07 08:55:40 UTC
svn commit: r1042928 - in /incubator/hama/trunk: ./
src/java/org/apache/hama/bsp/ src/test/org/apache/hama/bsp/
Author: edwardyoon
Date: Tue Dec 7 07:55:39 2010
New Revision: 1042928
URL: http://svn.apache.org/viewvc?rev=1042928&view=rev
Log:
The all taskid variable's type should be declared as a TaskAttemptID
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Dec 7 07:55:39 2010
@@ -50,6 +50,8 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ HAMA-336: The all taskid variable's type should be declared as a TaskAttemptID
+ (edwardyoon)
HAMA-331: Removing JobInProgressListener
and adding JobInitThread to BSPMaster (edwardyoon)
HAMA-334: Removing "java5.home" env key from build script. (edwardyoon)
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Tue Dec 7 07:55:39 2010
@@ -67,7 +67,7 @@ public class BSPMaster implements JobSub
private static final int FS_ACCESS_RETRY_PERIOD = 10000;
public static final long GROOMSERVER_EXPIRY_INTERVAL = 10 * 60 * 1000;
static long JOBINIT_SLEEP_INTERVAL = 2000;
-
+
// States
State state = State.INITIALIZING;
@@ -102,9 +102,9 @@ public class BSPMaster implements JobSub
private Map<BSPJobID, JobInProgress> jobs = new TreeMap<BSPJobID, JobInProgress>();
private TaskScheduler taskScheduler;
- TreeMap<String, String> taskIdToGroomNameMap = new TreeMap<String, String>();
- TreeMap<String, TreeSet<String>> groomNameToTaskIdsMap = new TreeMap<String, TreeSet<String>>();
- Map<String, TaskInProgress> taskIdToTaskInProgressMap = new TreeMap<String, TaskInProgress>();
+ TreeMap<TaskAttemptID, String> taskIdToGroomNameMap = new TreeMap<TaskAttemptID, String>();
+ TreeMap<String, TreeSet<TaskAttemptID>> groomNameToTaskIdsMap = new TreeMap<String, TreeSet<TaskAttemptID>>();
+ Map<TaskAttemptID, TaskInProgress> taskIdToTaskInProgressMap = new TreeMap<TaskAttemptID, TaskInProgress>();
Vector<JobInProgress> jobInitQueue = new Vector<JobInProgress>();
JobInitThread initJobs = new JobInitThread();
@@ -195,43 +195,45 @@ public class BSPMaster implements JobSub
return activeGrooms;
}
- /////////////////////////////////////////////////////////////////
- // Used to init new jobs that have just been created
- /////////////////////////////////////////////////////////////////
+ // ///////////////////////////////////////////////////////////////
+ // Used to init new jobs that have just been created
+ // ///////////////////////////////////////////////////////////////
class JobInitThread implements Runnable {
private volatile boolean shouldRun = true;
-
- public JobInitThread() {
- }
- public void run() {
- while (shouldRun) {
- JobInProgress job = null;
- synchronized (jobInitQueue) {
- if (jobInitQueue.size() > 0) {
- job = (JobInProgress) jobInitQueue.elementAt(0);
- jobInitQueue.remove(job);
- } else {
- try {
- jobInitQueue.wait(JOBINIT_SLEEP_INTERVAL);
- } catch (InterruptedException iex) {
- }
- }
- }
- try {
- if (job != null) {
- job.initTasks();
- }
- } catch (Exception e) {
- LOG.warn("job init failed: " + e);
- job.kill();
- }
+
+ public JobInitThread() {
+ }
+
+ public void run() {
+ while (shouldRun) {
+ JobInProgress job = null;
+ synchronized (jobInitQueue) {
+ if (jobInitQueue.size() > 0) {
+ job = (JobInProgress) jobInitQueue.elementAt(0);
+ jobInitQueue.remove(job);
+ } else {
+ try {
+ jobInitQueue.wait(JOBINIT_SLEEP_INTERVAL);
+ } catch (InterruptedException iex) {
+ }
}
+ }
+ try {
+ if (job != null) {
+ job.initTasks();
+ }
+ } catch (Exception e) {
+ LOG.warn("job init failed: " + e);
+ job.kill();
+ }
}
- public void stopIniter() {
- shouldRun = false;
- }
+ }
+
+ public void stopIniter() {
+ shouldRun = false;
+ }
}
-
+
// /////////////////////////////////////////////////////////////
// BSPMaster methods
// /////////////////////////////////////////////////////////////
@@ -303,7 +305,7 @@ public class BSPMaster implements JobSub
public void offerService() throws InterruptedException, IOException {
new Thread(this.initJobs).start();
LOG.info("Starting jobInitThread");
-
+
this.interTrackerServer.start();
synchronized (this) {
@@ -355,11 +357,12 @@ public class BSPMaster implements JobSub
groomToHeartbeatResponseMap.remove(groomName);
}
return new HeartbeatResponse(newResponseId,
- new GroomServerAction[] { new ReinitGroomAction() },
- Collections.<String, String>emptyMap());
+ new GroomServerAction[] { new ReinitGroomAction() }, Collections
+ .<String, String> emptyMap());
}
- HeartbeatResponse response = new HeartbeatResponse(newResponseId, null, groomServerPeers);
+ HeartbeatResponse response = new HeartbeatResponse(newResponseId, null,
+ groomServerPeers);
List<GroomServerAction> actions = new ArrayList<GroomServerAction>();
// Check for new tasks to be executed on the groom server
@@ -372,7 +375,7 @@ public class BSPMaster implements JobSub
for (Task task : taskList) {
if (task != null) {
- actions.add(new LaunchTaskAction(task));
+ actions.add(new LaunchTaskAction(task));
}
}
}
@@ -383,7 +386,7 @@ public class BSPMaster implements JobSub
groomToHeartbeatResponseMap.put(groomName, response);
removeMarkedTasks(groomName);
updateTaskStatuses(status);
-
+
return response;
}
@@ -391,8 +394,9 @@ public class BSPMaster implements JobSub
for (Iterator<TaskStatus> it = status.taskReports(); it.hasNext();) {
TaskStatus report = it.next();
report.setGroomServer(status.getGroomName());
- String taskId = report.getTaskId();
- TaskInProgress tip = (TaskInProgress) taskIdToTaskInProgressMap.get(taskId);
+ TaskAttemptID taskId = report.getTaskId();
+ TaskInProgress tip = (TaskInProgress) taskIdToTaskInProgressMap
+ .get(taskId);
if (tip == null) {
LOG.info("Serious problem. While updating status, cannot find taskid "
@@ -414,14 +418,14 @@ public class BSPMaster implements JobSub
}
// (trackerID -> TreeSet of completed taskids running at that tracker)
- TreeMap<String, Set<String>> trackerToMarkedTasksMap = new TreeMap<String, Set<String>>();
+ TreeMap<String, TreeSet<TaskAttemptID>> trackerToMarkedTasksMap = new TreeMap<String, TreeSet<TaskAttemptID>>();
private void removeMarkedTasks(String groomName) {
// Purge all the 'marked' tasks which were running at taskTracker
- TreeSet<String> markedTaskSet = (TreeSet<String>) trackerToMarkedTasksMap
+ TreeSet<TaskAttemptID> markedTaskSet = trackerToMarkedTasksMap
.get(groomName);
if (markedTaskSet != null) {
- for (String taskid : markedTaskSet) {
+ for (TaskAttemptID taskid : markedTaskSet) {
removeTaskEntry(taskid);
LOG.info("Removed completed task '" + taskid + "' from '" + groomName
+ "'");
@@ -431,13 +435,13 @@ public class BSPMaster implements JobSub
}
}
- private void removeTaskEntry(String taskid) {
+ private void removeTaskEntry(TaskAttemptID taskid) {
// taskid --> groom
String groom = taskIdToGroomNameMap.remove(taskid);
// groom --> taskid
if (groom != null) {
- TreeSet<String> groomSet = groomNameToTaskIdsMap.get(groom);
+ TreeSet<TaskAttemptID> groomSet = groomNameToTaskIdsMap.get(groom);
if (groomSet != null) {
groomSet.remove(taskid);
}
@@ -449,12 +453,13 @@ public class BSPMaster implements JobSub
}
private List<GroomServerAction> getTasksToKill(String groomName) {
- Set<String> taskIds = (TreeSet<String>) groomNameToTaskIdsMap.get(groomName);
+ Set<TaskAttemptID> taskIds = groomNameToTaskIdsMap.get(groomName);
if (taskIds != null) {
List<GroomServerAction> killList = new ArrayList<GroomServerAction>();
Set<String> killJobIds = new TreeSet<String>();
- for (String killTaskId : taskIds) {
- TaskInProgress tip = (TaskInProgress) taskIdToTaskInProgressMap.get(killTaskId);
+ for (TaskAttemptID killTaskId : taskIds) {
+ TaskInProgress tip = (TaskInProgress) taskIdToTaskInProgressMap
+ .get(killTaskId);
if (tip.shouldCloseForClosedJob(killTaskId)) {
//
// This is how the BSPMaster ends a task at the GroomServer.
@@ -482,7 +487,7 @@ public class BSPMaster implements JobSub
return null;
}
-
+
/**
* Process incoming heartbeat messages from the groom.
*/
@@ -499,7 +504,8 @@ public class BSPMaster implements JobSub
}
if (initialContact) {
- groomServerPeers.put(groomStatus.getGroomName(), groomStatus.getPeerName());
+ groomServerPeers.put(groomStatus.getGroomName(), groomStatus
+ .getPeerName());
}
return true;
@@ -571,7 +577,7 @@ public class BSPMaster implements JobSub
jobInitQueue.notifyAll();
}
}
-
+
return job.getStatus();
}
@@ -686,7 +692,7 @@ public class BSPMaster implements JobSub
this.interTrackerServer.stop();
}
- public void createTaskEntry(String taskid, String groomServer,
+ public void createTaskEntry(TaskAttemptID taskid, String groomServer,
TaskInProgress taskInProgress) {
LOG.info("Adding task '" + taskid + "' to tip " + taskInProgress.getTIPId()
+ ", for groom '" + groomServer + "'");
@@ -695,9 +701,9 @@ public class BSPMaster implements JobSub
taskIdToGroomNameMap.put(taskid, groomServer);
// groom --> taskid
- TreeSet<String> taskset = groomNameToTaskIdsMap.get(groomServer);
+ TreeSet<TaskAttemptID> taskset = groomNameToTaskIdsMap.get(groomServer);
if (taskset == null) {
- taskset = new TreeSet<String>();
+ taskset = new TreeSet<TaskAttemptID>();
groomNameToTaskIdsMap.put(groomServer, taskset);
}
taskset.add(taskid);
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Tue Dec 7 07:55:39 2010
@@ -22,7 +22,7 @@ public class BSPTask extends Task {
public BSPTask() {
}
- public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition) {
+ public BSPTask(BSPJobID jobId, String jobFile, TaskAttemptID taskid, int partition) {
this.jobId = jobId;
this.jobFile = jobFile;
this.taskId = taskid;
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Tue Dec 7 07:55:39 2010
@@ -87,7 +87,7 @@ public class GroomServer implements Runn
private int maxCurrentTasks = 1;
Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
/** Map from taskId -> TaskInProgress. */
- Map<String, TaskInProgress> runningTasks = null;
+ Map<TaskAttemptID, TaskInProgress> runningTasks = null;
Map<BSPJobID, RunningJob> runningJobs = null;
private BlockingQueue<GroomServerAction> tasksToCleanup = new LinkedBlockingQueue<GroomServerAction>();
@@ -122,7 +122,7 @@ public class GroomServer implements Runn
// Clear out state tables
this.tasks.clear();
this.runningJobs = new TreeMap<BSPJobID, RunningJob>();
- this.runningTasks = new LinkedHashMap<String, TaskInProgress>();
+ this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
this.acceptNewTasks = true;
this.conf.set(Constants.PEER_HOST, localHostname);
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java Tue Dec 7 07:55:39 2010
@@ -53,7 +53,9 @@ class JobInProgress {
Path localJobFile = null;
Path localJarFile = null;
private LocalFileSystem localFs;
-
+ // Indicates how many times the job got restarted
+ private int restartCount;
+
long startTime;
long launchTime;
long finishTime;
@@ -66,7 +68,7 @@ class JobInProgress {
int numBSPTasks = 0;
int clusterSize;
-
+
public JobInProgress(BSPJobID jobId, BSPMaster master, Configuration conf)
throws IOException {
this.conf = conf;
@@ -77,8 +79,10 @@ class JobInProgress {
this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
this.startTime = System.currentTimeMillis();
status.setStartTime(startTime);
+
this.superstepCounter = 0;
-
+ this.restartCount = 0;
+
this.localJobFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId
+ ".xml");
this.localJarFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId
@@ -191,7 +195,7 @@ class JobInProgress {
}
public synchronized void completedTask(TaskInProgress tip, TaskStatus status) {
- String taskid = status.getTaskId();
+ TaskAttemptID taskid = status.getTaskId();
updateTaskStatus(tip, status);
LOG.info("Taskid '" + taskid + "' has finished successfully.");
tip.completed(taskid);
@@ -272,4 +276,12 @@ class JobInProgress {
LOG.info("Error cleaning up " + profile.getJobID() + ": " + e);
}
}
+
+ /**
+ * Get the number of times the job has restarted
+ */
+ int getNumRestarts() {
+ return restartCount;
+ }
+
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java Tue Dec 7 07:55:39 2010
@@ -21,38 +21,35 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.hadoop.io.Text;
-
-
/**
* Represents a directive from the {@link org.apache.hama.bsp.BSPMaster}
* to the {@link org.apache.hama.bsp.GroomServer} to kill a task.
*
*/
class KillTaskAction extends GroomServerAction {
- String taskId;
+ TaskAttemptID taskId;
public KillTaskAction() {
super(ActionType.KILL_TASK);
- taskId = new String();
+ taskId = new TaskAttemptID();
}
- public KillTaskAction(String killTaskId) {
+ public KillTaskAction(TaskAttemptID killTaskId) {
super(ActionType.KILL_TASK);
this.taskId = killTaskId;
}
- public String getTaskID() {
+ public TaskAttemptID getTaskID() {
return taskId;
}
@Override
public void write(DataOutput out) throws IOException {
- Text.writeString(out, taskId);
+ taskId.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
- taskId = Text.readString(in);
+ taskId.readFields(in);
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Tue Dec 7 07:55:39 2010
@@ -35,17 +35,17 @@ public abstract class Task implements Wr
protected BSPJobID jobId;
protected String jobFile;
- protected String taskId;
+ protected TaskAttemptID taskId;
protected int partition;
protected LocalDirAllocator lDirAlloc;
public Task() {
jobId = new BSPJobID();
- taskId = new String();
+ taskId = new TaskAttemptID();
}
- public Task(BSPJobID jobId, String jobFile, String taskId, int partition) {
+ public Task(BSPJobID jobId, String jobFile, TaskAttemptID taskId, int partition) {
this.jobId = jobId;
this.jobFile = jobFile;
this.taskId = taskId;
@@ -63,7 +63,7 @@ public abstract class Task implements Wr
return jobFile;
}
- public String getTaskID() {
+ public TaskAttemptID getTaskID() {
return taskId;
}
@@ -95,7 +95,7 @@ public abstract class Task implements Wr
public void write(DataOutput out) throws IOException {
jobId.write(out);
Text.writeString(out, jobFile);
- Text.writeString(out, taskId);
+ taskId.write(out);
out.writeInt(partition);
}
@@ -103,7 +103,7 @@ public abstract class Task implements Wr
public void readFields(DataInput in) throws IOException {
jobId.readFields(in);
jobFile = Text.readString(in);
- taskId = Text.readString(in);
+ taskId.readFields(in);
partition = in.readInt();
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java Tue Dec 7 07:55:39 2010
@@ -21,6 +21,9 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+/**
+ * TaskAttemptID is a unique identifier for a task attempt.
+ */
public class TaskAttemptID extends ID {
protected static final String ATTEMPT = "attempt";
private TaskID taskId;
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Tue Dec 7 07:55:39 2010
@@ -38,7 +38,8 @@ class TaskInProgress {
static final int MAX_TASK_EXECS = 1;
int maxTaskAttempts = 4;
private boolean failed = false;
-
+ private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
+
// Job Meta
private String jobFile = null;
private int partition;
@@ -63,13 +64,13 @@ class TaskInProgress {
// Map from task Id -> GroomServer Id, contains tasks that are
// currently runnings
- private TreeMap<String, String> activeTasks = new TreeMap<String, String>();
+ private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();
// All attempt Ids of this TIP
// private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();
/**
* Map from taskId -> TaskStatus
*/
- private TreeMap<String, TaskStatus> taskStatuses = new TreeMap<String, TaskStatus>();
+ private TreeMap<TaskAttemptID, TaskStatus> taskStatuses = new TreeMap<TaskAttemptID, TaskStatus>();
private BSPJobID jobId;
@@ -92,9 +93,12 @@ class TaskInProgress {
Task t = null;
// TODO use the TaskID, instead of String.
- String taskid = null;
+ TaskAttemptID taskid = null;
if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts)) {
- taskid = new String("task_" + status.getGroomName() + "_" + nextTaskId);
+ int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART + nextTaskId;
+ taskid = new TaskAttemptID( id, attemptId);
+ //new String("task_" + status.getGroomName() + "_" + nextTaskId);
+
++nextTaskId;
} else {
LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts)
@@ -131,7 +135,7 @@ class TaskInProgress {
return id;
}
- public TreeMap<String, String> getTasks() {
+ public TreeMap<TaskAttemptID, String> getTasks() {
return activeTasks;
}
@@ -163,9 +167,9 @@ class TaskInProgress {
return (completes > 0);
}
- private TreeSet<String> tasksReportedClosed = new TreeSet<String>();
+ private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();
- public boolean shouldCloseForClosedJob(String taskid) {
+ public boolean shouldCloseForClosedJob(TaskAttemptID taskid) {
TaskStatus ts = (TaskStatus) taskStatuses.get(taskid);
if ((ts != null) && (!tasksReportedClosed.contains(taskid))
&& (job.getStatus().getRunState() != JobStatus.RUNNING)) {
@@ -176,8 +180,8 @@ class TaskInProgress {
}
}
- public void completed(String taskid) {
- LOG.info("Task '" + taskid + "' has completed.");
+ public void completed(TaskAttemptID taskid) {
+ LOG.info("Task '" + taskid.getTaskID().toString() + "' has completed.");
TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
status.setRunState(TaskStatus.State.SUCCEEDED);
activeTasks.remove(taskid);
@@ -195,7 +199,7 @@ class TaskInProgress {
taskStatuses.put(status.getTaskId(), status);
}
- public TaskStatus getTaskStatus(String taskId) {
+ public TaskStatus getTaskStatus(TaskAttemptID taskId) {
return this.taskStatuses.get(taskId);
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java Tue Dec 7 07:55:39 2010
@@ -40,7 +40,7 @@ class TaskStatus implements Writable, Cl
RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN
}
- private String taskId;
+ private TaskAttemptID taskId;
private float progress;
private volatile State runState;
private String stateString;
@@ -56,11 +56,11 @@ class TaskStatus implements Writable, Cl
*
*/
public TaskStatus() {
- taskId = new String();
+ taskId = new TaskAttemptID();
this.superstepCount = 0;
}
- public TaskStatus(String taskId, float progress, State runState,
+ public TaskStatus(TaskAttemptID taskId, float progress, State runState,
String stateString, String groomServer, Phase phase) {
this.taskId = taskId;
this.progress = progress;
@@ -75,7 +75,7 @@ class TaskStatus implements Writable, Cl
// Accessors and Modifiers
// //////////////////////////////////////////////////
- public String getTaskId() {
+ public TaskAttemptID getTaskId() {
return taskId;
}
@@ -242,7 +242,7 @@ class TaskStatus implements Writable, Cl
@Override
public void readFields(DataInput in) throws IOException {
- this.taskId = Text.readString(in);
+ this.taskId.readFields(in);
this.progress = in.readFloat();
this.runState = WritableUtils.readEnum(in, State.class);
this.stateString = Text.readString(in);
@@ -254,7 +254,7 @@ class TaskStatus implements Writable, Cl
@Override
public void write(DataOutput out) throws IOException {
- Text.writeString(out, taskId);
+ taskId.write(out);
out.writeFloat(progress);
WritableUtils.writeEnum(out, runState);
Text.writeString(out, stateString);
Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java?rev=1042928&r1=1042927&r2=1042928&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java Tue Dec 7 07:55:39 2010
@@ -96,8 +96,7 @@ public class TestBSPPeer extends HamaClu
peerNames.add("localhost:" + (30000 + i));
}
peer.setAllPeerNames(peerNames);
- TaskStatus currentTaskStatus = new TaskStatus("localhost:"
- + lastTwoDigitsOfPort, 0, null, null, null, null);
+ TaskStatus currentTaskStatus = new TaskStatus(new TaskAttemptID(), 0, null, null, null, null);
peer.setCurrentTaskStatus(currentTaskStatus);
BSPJob jobConf = new BSPJob(conf, NUM_PEER);
peer.setJobConf((BSPJob) jobConf);