You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/15 20:42:37 UTC

svn commit: r406706 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/ipc/RPC.java src/java/org/apache/hadoop/mapred/TaskTracker.java

Author: cutting
Date: Mon May 15 11:42:36 2006
New Revision: 406706

URL: http://svn.apache.org/viewcvs?rev=406706&view=rev
Log:
HADOOP-180.  Quee task cleanups so that TaskTracker remains responsive.  Contributed by Owen.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=406706&r1=406705&r2=406706&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon May 15 11:42:36 2006
@@ -16,6 +16,11 @@
     id that no existing blocks already have that id.
     (Milind Bhandarkar via cutting)
 
+ 5. HADOOP-180. Make a daemon thread that does the actual task clean ups, so
+    that the main offerService thread in the taskTracker doesn't get stuck
+    and miss his heartbeat window. This was killing many task trackers as
+    big jobs finished (300+ tasks / node). (omalley via cutting)
+
 
 Release 0.2.1 - 2006-05-12
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=406706&r1=406705&r2=406706&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java Mon May 15 11:42:36 2006
@@ -23,7 +23,7 @@
 import java.lang.reflect.InvocationTargetException;
 
 import java.net.InetSocketAddress;
-import java.util.logging.Logger;
+import java.util.logging.*;
 import java.io.*;
 
 import org.apache.hadoop.io.*;
@@ -146,8 +146,11 @@
 
     public Object invoke(Object proxy, Method method, Object[] args)
       throws Throwable {
+      long startTime = System.currentTimeMillis();
       ObjectWritable value = (ObjectWritable)
         client.call(new Invocation(method, args), address);
+      long callTime = System.currentTimeMillis() - startTime;
+      LOG.fine("Call: " + method.getName() + " " + callTime);
       return value.get();
     }
   }
@@ -240,7 +243,7 @@
         long startTime = System.currentTimeMillis();
         Object value = method.invoke(instance, call.getParameters());
         long callTime = System.currentTimeMillis() - startTime;
-        LOG.fine("Call: " + call.getMethodName() + " " + callTime);
+        LOG.fine("Served: " + call.getMethodName() + " " + callTime);
         if (verbose) log("Return: "+value);
 
         return new ObjectWritable(method.getReturnType(), value);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=406706&r1=406705&r2=406706&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon May 15 11:42:36 2006
@@ -72,7 +72,33 @@
 
     private int maxCurrentTasks;
     private int failures;
-
+    
+    /**
+     * A list of tips that should be cleaned up.
+     */
+    private BlockingQueue tasksToCleanup = new BlockingQueue();
+    
+    /**
+     * A daemon-thread that pulls tips off the list of things to cleanup.
+     */
+    private Thread taskCleanupThread = 
+      new Thread(new Runnable() {
+        public void run() {
+          while (true) {
+            try {
+              TaskInProgress tip = (TaskInProgress) tasksToCleanup.take();
+              tip.jobHasFinished();
+            } catch (Throwable except) {
+              LOG.warning(StringUtils.stringifyException(except));
+            }
+          }
+        }
+      });
+    {
+      taskCleanupThread.setDaemon(true);
+      taskCleanupThread.start();
+    }
+    
     class MapOutputServer extends RPC.Server {
       private MapOutputServer(int port, int threads) {
         super(TaskTracker.this, fConf, port, threads, false);
@@ -108,11 +134,8 @@
      * so we can call it again and "recycle" the object after calling
      * close().
      */
-    void initialize() throws IOException {
+    synchronized void initialize() throws IOException {
         this.localHostname = InetAddress.getLocalHost().getHostName();
-        this.taskTrackerName = "tracker_" + localHostname + "_" +
-                               (Math.abs(r.nextInt()) % 100000);
-        LOG.info("Starting tracker " + taskTrackerName);
 
         fConf.deleteLocalFiles(SUBDIR);
 
@@ -148,6 +171,9 @@
                 this.mapOutputPort++;
             }
         }
+        this.taskTrackerName = "tracker_" + 
+                               localHostname + ":" + taskReportPort;
+        LOG.info("Starting tracker " + taskTrackerName);
 
         // Clear out temporary files that might be lying around
         this.mapOutputFile.cleanupStorage();
@@ -323,12 +349,11 @@
             if (toCloseIds != null) {
               synchronized (this) {
                 for (int i = 0; i < toCloseIds.length; i++) {
-                  TaskInProgress tip = (TaskInProgress) tasks.get(toCloseIds[i]);
-                  try {
-                    tip.jobHasFinished();
-                  } catch (IOException ie) {
-                    LOG.info("problem finishing task: " +
-                             StringUtils.stringifyException(ie));
+                  Object tip = tasks.get(toCloseIds[i]);
+                  if (tip != null) {
+                    tasksToCleanup.put(tip);
+                  } else {
+                    LOG.info("Attempt to cleanup unknown tip " + toCloseIds[i]);
                   }
                 }
               }
@@ -376,7 +401,6 @@
       }
     }
     
-
     /**
      * The server retry loop.  
      * This while-loop attempts to connect to the JobTracker.  It only 
@@ -414,6 +438,51 @@
         }
     }
 
+    /**
+     * This class implements a queue that is put between producer and 
+     * consumer threads. It will grow without bound.
+     * @author Owen O'Malley
+     */
+    static private class BlockingQueue {
+      private List queue;
+      
+      /**
+       * Create an empty queue.
+       */
+      public BlockingQueue() {
+        queue = new ArrayList();
+      }
+       
+      /**
+       * Put the given object at the back of the queue.
+       * @param obj
+       */
+      public void put(Object obj) {
+        synchronized (queue) {
+          queue.add(obj);
+          queue.notify();
+        }
+      }
+      
+      /**
+       * Take the object at the front of the queue.
+       * It blocks until there is an object available.
+       * @return the head of the queue
+       */
+      public Object take() {
+        synchronized (queue) {
+          while (queue.isEmpty()) {
+            try {
+              queue.wait();
+            } catch (InterruptedException ie) {}
+          }
+          Object result = queue.get(0);
+          queue.remove(0);
+          return result;
+        }
+      }
+    }
+    
     ///////////////////////////////////////////////////////
     // TaskInProgress maintains all the info for a Task that
     // lives at this TaskTracker.  It maintains the Task object,
@@ -641,13 +710,19 @@
          * controlling job is all done and the files have been copied
          * away, or the task failed and we don't need the remains.
          */
-        synchronized void cleanup() throws IOException {
-            tasks.remove(task.getTaskId());
-            try {
-                runner.close();
-            } catch (IOException ie) {
+        void cleanup() throws IOException {
+            String taskId = task.getTaskId();
+            LOG.fine("Cleaning up " + taskId);
+            synchronized (TaskTracker.this) {
+               tasks.remove(taskId);
+               synchronized (this) {
+                 try {
+                    runner.close();
+                 } catch (Throwable ie) {
+                 }
+               }
             }
-            this.defaultJobConf.deleteLocalFiles(SUBDIR + "/" + task.getTaskId());
+            this.defaultJobConf.deleteLocalFiles(SUBDIR + "/" + taskId);
         }
     }