You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/08/30 05:08:49 UTC
svn commit: r264685 - in
/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred:
InterTrackerProtocol.java JobTracker.java TaskTracker.java
Author: cutting
Date: Mon Aug 29 20:08:46 2005
New Revision: 264685
URL: http://svn.apache.org/viewcvs?rev=264685&view=rev
Log:
Synchronize things in TaskTracker.offerService() loop. Also remove boxing in the heartbeat RPC.
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InterTrackerProtocol.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InterTrackerProtocol.java?rev=264685&r1=264684&r2=264685&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InterTrackerProtocol.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InterTrackerProtocol.java Mon Aug 29 20:08:46 2005
@@ -35,7 +35,7 @@
* TaskTracker must also indicate whether this is the first interaction
* (since state refresh)
*/
- IntWritable emitHeartbeat(TaskTrackerStatus status, BooleanWritable initialContact);
+ int emitHeartbeat(TaskTrackerStatus status, boolean initialContact);
/** Called to get new tasks from from the job tracker for this tracker.*/
Task pollForNewTask(String trackerName);
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java?rev=264685&r1=264684&r2=264685&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java Mon Aug 29 20:08:46 2005
@@ -329,13 +329,13 @@
/**
* Process incoming heartbeat messages from the task trackers.
*/
- public synchronized IntWritable emitHeartbeat(TaskTrackerStatus trackerStatus, BooleanWritable initialContact) {
+ public synchronized int emitHeartbeat(TaskTrackerStatus trackerStatus, boolean initialContact) {
String trackerName = trackerStatus.getTrackerName();
trackerStatus.setLastSeen(System.currentTimeMillis());
synchronized (taskTrackers) {
synchronized (trackerExpiryQueue) {
- if (initialContact.get()) {
+ if (initialContact) {
// If it's first contact, then clear out any state hanging around
if (taskTrackers.get(trackerName) != null) {
taskTrackers.remove(trackerName);
@@ -344,14 +344,14 @@
} else {
// If not first contact, there should be some record of the tracker
if (taskTrackers.get(trackerName) == null) {
- return new IntWritable(InterTrackerProtocol.UNKNOWN_TASKTRACKER);
+ return InterTrackerProtocol.UNKNOWN_TASKTRACKER;
}
}
// Store latest state. If first contact, then save current
// state in expiry queue
taskTrackers.put(trackerName, trackerStatus);
- if (initialContact.get()) {
+ if (initialContact) {
trackerExpiryQueue.add(trackerStatus);
}
}
@@ -359,7 +359,7 @@
updateTaskStatuses(trackerStatus);
//LOG.info("Got heartbeat from "+trackerName);
- return new IntWritable(InterTrackerProtocol.TRACKERS_OK);
+ return InterTrackerProtocol.TRACKERS_OK;
}
/**
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=264685&r1=264684&r2=264685&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java Mon Aug 29 20:08:46 2005
@@ -124,7 +124,7 @@
* within the same process space might be restarted, so everything must be
* clean.
*/
- public void close() throws IOException {
+ public synchronized void close() throws IOException {
// Kill running tasks
Vector v = new Vector();
for (Iterator it = tasks.values().iterator(); it.hasNext(); ) {
@@ -186,7 +186,7 @@
// Emit standard hearbeat message to check in with JobTracker
//
Vector taskReports = new Vector();
- synchronized (runningTasks) {
+ synchronized (this) {
for (Iterator it = runningTasks.keySet().iterator(); it.hasNext(); ) {
String taskid = (String) it.next();
TaskInProgress tip = (TaskInProgress) runningTasks.get(taskid);
@@ -204,11 +204,11 @@
if (justStarted) {
this.fs = NutchFileSystem.getNamed(jobClient.getFilesystemName());
}
-
- IntWritable resultCode = jobClient.emitHeartbeat(new TaskTrackerStatus(taskTrackerName, localHostname, mapOutputPort, taskReports), new BooleanWritable(justStarted));
+
+ int resultCode = jobClient.emitHeartbeat(new TaskTrackerStatus(taskTrackerName, localHostname, mapOutputPort, taskReports), justStarted);
justStarted = false;
-
- if (resultCode.get() == InterTrackerProtocol.UNKNOWN_TASKTRACKER) {
+
+ if (resultCode == InterTrackerProtocol.UNKNOWN_TASKTRACKER) {
return STALE_STATE;
}
@@ -219,8 +219,10 @@
Task t = jobClient.pollForNewTask(taskTrackerName);
if (t != null) {
TaskInProgress tip = new TaskInProgress(t);
- tasks.put(t.getTaskId(), tip);
- runningTasks.put(t.getTaskId(), tip);
+ synchronized (this) {
+ tasks.put(t.getTaskId(), tip);
+ runningTasks.put(t.getTaskId(), tip);
+ }
tip.launchTask();
}
}
@@ -228,7 +230,7 @@
//
// Kill any tasks that have not reported progress in the last X seconds.
//
- synchronized (runningTasks) {
+ synchronized (this) {
for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
TaskInProgress tip = (TaskInProgress) it.next();
if ((tip.getRunState() == TaskStatus.RUNNING) &&
@@ -245,8 +247,10 @@
//
String toCloseId = jobClient.pollForClosedTask(taskTrackerName);
if (toCloseId != null) {
+ synchronized (this) {
TaskInProgress tip = (TaskInProgress) tasks.get(toCloseId);
tip.cleanup();
+ }
}
lastHeartbeat = now;
}
@@ -538,7 +542,7 @@
/**
* The task is no longer running. It may not have completed successfully
*/
- void reportTaskFinished(String taskid) {
+ synchronized void reportTaskFinished(String taskid) {
TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
tip.taskFinished();
}