You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/08/30 05:08:49 UTC

svn commit: r264685 - in /lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred: InterTrackerProtocol.java JobTracker.java TaskTracker.java

Author: cutting
Date: Mon Aug 29 20:08:46 2005
New Revision: 264685

URL: http://svn.apache.org/viewcvs?rev=264685&view=rev
Log:
Synchronize things in TaskTracker.offerService() loop.  Also remove boxing in the heartbeat RPC.

Modified:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InterTrackerProtocol.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InterTrackerProtocol.java?rev=264685&r1=264684&r2=264685&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InterTrackerProtocol.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InterTrackerProtocol.java Mon Aug 29 20:08:46 2005
@@ -35,7 +35,7 @@
    * TaskTracker must also indicate whether this is the first interaction
    * (since state refresh)
    */
-  IntWritable emitHeartbeat(TaskTrackerStatus status, BooleanWritable initialContact);
+  int emitHeartbeat(TaskTrackerStatus status, boolean initialContact);
 
   /** Called to get new tasks from from the job tracker for this tracker.*/
   Task pollForNewTask(String trackerName);

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java?rev=264685&r1=264684&r2=264685&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobTracker.java Mon Aug 29 20:08:46 2005
@@ -329,13 +329,13 @@
     /**
      * Process incoming heartbeat messages from the task trackers.
      */
-    public synchronized IntWritable emitHeartbeat(TaskTrackerStatus trackerStatus, BooleanWritable initialContact) {
+    public synchronized int emitHeartbeat(TaskTrackerStatus trackerStatus, boolean initialContact) {
         String trackerName = trackerStatus.getTrackerName();
         trackerStatus.setLastSeen(System.currentTimeMillis());
 
         synchronized (taskTrackers) {
             synchronized (trackerExpiryQueue) {
-                if (initialContact.get()) {
+                if (initialContact) {
                     // If it's first contact, then clear out any state hanging around
                     if (taskTrackers.get(trackerName) != null) {
                         taskTrackers.remove(trackerName);
@@ -344,14 +344,14 @@
                 } else {
                     // If not first contact, there should be some record of the tracker
                     if (taskTrackers.get(trackerName) == null) {
-                        return new IntWritable(InterTrackerProtocol.UNKNOWN_TASKTRACKER);
+                        return InterTrackerProtocol.UNKNOWN_TASKTRACKER;
                     }
                 }
 
                 // Store latest state.  If first contact, then save current
                 // state in expiry queue
                 taskTrackers.put(trackerName, trackerStatus);
-                if (initialContact.get()) {
+                if (initialContact) {
                     trackerExpiryQueue.add(trackerStatus);
                 }
             }
@@ -359,7 +359,7 @@
 
         updateTaskStatuses(trackerStatus);
         //LOG.info("Got heartbeat from "+trackerName);
-        return new IntWritable(InterTrackerProtocol.TRACKERS_OK);
+        return InterTrackerProtocol.TRACKERS_OK;
     }
 
     /**

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=264685&r1=264684&r2=264685&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java Mon Aug 29 20:08:46 2005
@@ -124,7 +124,7 @@
      * within the same process space might be restarted, so everything must be
      * clean.
      */
-    public void close() throws IOException {
+    public synchronized void close() throws IOException {
         // Kill running tasks
         Vector v = new Vector();
         for (Iterator it = tasks.values().iterator(); it.hasNext(); ) {
@@ -186,7 +186,7 @@
             // Emit standard hearbeat message to check in with JobTracker
             //
             Vector taskReports = new Vector();
-            synchronized (runningTasks) {
+            synchronized (this) {
                 for (Iterator it = runningTasks.keySet().iterator(); it.hasNext(); ) {
                     String taskid = (String) it.next();
                     TaskInProgress tip = (TaskInProgress) runningTasks.get(taskid);
@@ -204,11 +204,11 @@
             if (justStarted) {
                 this.fs = NutchFileSystem.getNamed(jobClient.getFilesystemName());
             }
-
-            IntWritable resultCode = jobClient.emitHeartbeat(new TaskTrackerStatus(taskTrackerName, localHostname, mapOutputPort, taskReports), new BooleanWritable(justStarted));
+            
+            int resultCode = jobClient.emitHeartbeat(new TaskTrackerStatus(taskTrackerName, localHostname, mapOutputPort, taskReports), justStarted);
             justStarted = false;
-
-            if (resultCode.get() == InterTrackerProtocol.UNKNOWN_TASKTRACKER) {
+              
+            if (resultCode == InterTrackerProtocol.UNKNOWN_TASKTRACKER) {
                 return STALE_STATE;
             }
 
@@ -219,8 +219,10 @@
                 Task t = jobClient.pollForNewTask(taskTrackerName);
                 if (t != null) {
                     TaskInProgress tip = new TaskInProgress(t);
-                    tasks.put(t.getTaskId(), tip);
-                    runningTasks.put(t.getTaskId(), tip);
+                    synchronized (this) {
+                      tasks.put(t.getTaskId(), tip);
+                      runningTasks.put(t.getTaskId(), tip);
+                    }
                     tip.launchTask();
                 }
             }
@@ -228,7 +230,7 @@
             //
             // Kill any tasks that have not reported progress in the last X seconds.
             //
-            synchronized (runningTasks) {
+            synchronized (this) {
                 for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
                     TaskInProgress tip = (TaskInProgress) it.next();
                     if ((tip.getRunState() == TaskStatus.RUNNING) &&
@@ -245,8 +247,10 @@
             //
             String toCloseId = jobClient.pollForClosedTask(taskTrackerName);
             if (toCloseId != null) {
+              synchronized (this) {
                 TaskInProgress tip = (TaskInProgress) tasks.get(toCloseId);
                 tip.cleanup();
+              }
             }
             lastHeartbeat = now;
         }
@@ -538,7 +542,7 @@
     /**
      * The task is no longer running.  It may not have completed successfully
      */
-    void reportTaskFinished(String taskid) {
+    synchronized void reportTaskFinished(String taskid) {
         TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
         tip.taskFinished();
     }