You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/12/12 08:56:37 UTC

svn commit: r725939 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Author: ddas
Date: Thu Dec 11 23:56:37 2008
New Revision: 725939

URL: http://svn.apache.org/viewvc?rev=725939&view=rev
Log:
HADOOP-4683. Fixes Reduce shuffle scheduler to invoke getMapCompletionEvents in a separate thread. Contributed by Jothi Padmanabhan.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=725939&r1=725938&r2=725939&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Dec 11 23:56:37 2008
@@ -233,6 +233,9 @@
     based on the rack/host that has the most number of bytes.
     (Jothi Padmanabhan via ddas)
 
+    HADOOP-4683. Fixes Reduce shuffle scheduler to invoke getMapCompletionEvents
+    in a separate thread. (Jothi Padmanabhan via ddas)
+
   BUG FIXES
 
     HADOOP-4204. Fix findbugs warnings related to unused variables, naive

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=725939&r1=725938&r2=725939&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Thu Dec 11 23:56:37 2008
@@ -44,6 +44,7 @@
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -514,11 +515,6 @@
     private Set<String> uniqueHosts;
     
     /**
-     * the last time we polled the job tracker
-     */
-    private long lastPollTime;
-    
-    /**
      * A reference to the RamManager for writing the map outputs to.
      */
     
@@ -544,6 +540,11 @@
      * A flag to indicate when to exit localFS merge
      */
     private volatile boolean exitLocalFSMerge = false;
+
+    /** 
+     * A flag to indicate when to exit getMapEvents thread 
+     */
+    private volatile boolean exitGetMapEvents = false;
     
     /**
      * When we accumulate maxInMemOutputs number of files in ram, we merge/spill
@@ -604,7 +605,7 @@
     /**
      * Maximum number of fetch-retries per-map.
      */
-    private int maxFetchRetriesPerMap;
+    private volatile int maxFetchRetriesPerMap;
     
     /**
      * Combiner class to run during in-memory merge, if defined.
@@ -671,7 +672,13 @@
     private final List<MapOutput> mapOutputsFilesInMemory =
       Collections.synchronizedList(new LinkedList<MapOutput>());
     
-
+    /**
+     * The map for (Hosts, List of MapIds from this Host) maintaining
+     * map output locations
+     */
+    private final Map<String, List<MapOutputLocation>> mapLocations = 
+      new ConcurrentHashMap<String, List<MapOutputLocation>>();
+    
     /**
      * This class contains the methods that should be used for metrics-reporting
      * the specific metrics for shuffle. This class actually reports the
@@ -1611,9 +1618,6 @@
       // hostnames
       this.uniqueHosts = new HashSet<String>();
       
-      this.lastPollTime = 0;
-      
-
       // Seed the random number generator with a reasonably globally unique seed
       long randomSeed = System.nanoTime() + 
                         (long)Math.pow(this.reduceTask.getPartition(),
@@ -1630,9 +1634,6 @@
     long           reducerInputBytes = 0;
     
     public boolean fetchOutputs() throws IOException {
-      //The map for (Hosts, List of MapIds from this Host)
-      HashMap<String, List<MapOutputLocation>> mapLocations = 
-        new HashMap<String, List<MapOutputLocation>>();
       int totalFailures = 0;
       int            numInFlight = 0, numCopied = 0;
       long           bytesTransferred = 0;
@@ -1641,6 +1642,7 @@
         reduceTask.getProgress().phase();
       LocalFSMerger localFSMergerThread = null;
       InMemFSMergeThread inMemFSMergeThread = null;
+      GetMapEventsThread getMapEventsThread = null;
       
       for (int i = 0; i < numMaps; i++) {
         copyPhase.addPhase();       // add sub-phase per file
@@ -1664,15 +1666,15 @@
       localFSMergerThread.start();
       inMemFSMergeThread.start();
       
+      // start the map events thread
+      getMapEventsThread = new GetMapEventsThread();
+      getMapEventsThread.start();
+      
       // start the clock for bandwidth measurement
       long startTime = System.currentTimeMillis();
       long currentTime = startTime;
       long lastProgressTime = startTime;
       long lastOutputTime = 0;
-      IntWritable fromEventId = new IntWritable(0);
-
-      //List of unique hosts containing map outputs
-      List<String> hostList = new ArrayList<String>();
       
         // loop until we get all required outputs
         while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {
@@ -1688,73 +1690,43 @@
                    + (numMaps - copiedMapOutputs.size()) + " map output(s) "
                    + "where " + numInFlight + " is already in progress");
           }
-          
-          try {
-            // Put the hash entries for the failed fetches.
-            Iterator<MapOutputLocation> locItr = retryFetches.iterator();
-            while (locItr.hasNext()) {
-              MapOutputLocation loc = locItr.next(); 
-              List<MapOutputLocation> locList = 
-                mapLocations.get(loc.getHost());
-              if (locList == null) {
-                locList = new LinkedList<MapOutputLocation>();
-                mapLocations.put(loc.getHost(), locList);
-                hostList.add(loc.getHost());
-              }
-              //Add to the beginning of the list so that this map is 
-              //tried again before the others and we can hasten the 
-              //re-execution of this map should there be a problem
-              locList.add(0, loc);
-            }
-             
-            // The call getMapCompletionEvents will update fromEventId to
-            // used for the next call to getMapCompletionEvents
-
-            int currentNumObsoleteMapIds = obsoleteMapIds.size();
-
-            int numNewOutputs = getMapCompletionEvents(fromEventId, 
-                                                       mapLocations, 
-                                                       hostList);
 
-            if (numNewOutputs > 0 || logNow) {
-              LOG.info(reduceTask.getTaskID() + ": " +  
-                  "Got " + numNewOutputs + 
-                  " new map-outputs"); 
-            }
-            
-            int numNewObsoleteMaps = obsoleteMapIds.size()-currentNumObsoleteMapIds;
+          // Put the hash entries for the failed fetches.
+          Iterator<MapOutputLocation> locItr = retryFetches.iterator();
 
-            if (numNewObsoleteMaps > 0) {
-              LOG.info(reduceTask.getTaskID() + ": " +  
-                  "Got " + numNewObsoleteMaps + 
-                  " obsolete map-outputs from tasktracker ");  
-            }
-            
-            if (retryFetches.size() > 0) {
-              LOG.info(reduceTask.getTaskID() + ": " +  
+          while (locItr.hasNext()) {
+            MapOutputLocation loc = locItr.next(); 
+            List<MapOutputLocation> locList = 
+              mapLocations.get(loc.getHost());
+            //Add to the beginning of the list so that this map is 
+            //tried again before the others and we can hasten the 
+            //re-execution of this map should there be a problem
+            locList.add(0, loc);
+          }
+
+          if (retryFetches.size() > 0) {
+            LOG.info(reduceTask.getTaskID() + ": " +  
                   "Got " + retryFetches.size() +
                   " map-outputs from previous failures");
-            }
-            // clear the "failed" fetches hashmap
-            retryFetches.clear();
-          }
-          catch (IOException ie) {
-            LOG.warn(reduceTask.getTaskID() +
-                    " Problem locating map outputs: " +
-                    StringUtils.stringifyException(ie));
           }
-          
+          // clear the "failed" fetches hashmap
+          retryFetches.clear();
+
           // now walk through the cache and schedule what we can
           int numScheduled = 0;
           int numDups = 0;
           
           synchronized (scheduledCopies) {
-
+  
             // Randomize the map output locations to prevent 
             // all reduce-tasks swamping the same tasktracker
-            Collections.shuffle(hostList, this.random);
+            List<String> hostList = new ArrayList<String>();
+            hostList.addAll(mapLocations.keySet()); 
             
+            Collections.shuffle(hostList, this.random);
+              
             Iterator<String> hostsItr = hostList.iterator();
+
             while (hostsItr.hasNext()) {
             
               String host = hostsItr.next();
@@ -1762,9 +1734,13 @@
               List<MapOutputLocation> knownOutputsByLoc = 
                 mapLocations.get(host);
 
+              if (knownOutputsByLoc.size() == 0) {
+                continue;
+              }
+              
               //Identify duplicate hosts here
               if (uniqueHosts.contains(host)) {
-                 numDups += knownOutputsByLoc.size() -1; 
+                 numDups += knownOutputsByLoc.size(); 
                  continue;
               }
 
@@ -1782,34 +1758,32 @@
               if (penalized)
                 continue;
 
-              Iterator<MapOutputLocation> locItr = 
-                knownOutputsByLoc.iterator();
+              synchronized (knownOutputsByLoc) {
+              
+                locItr = knownOutputsByLoc.iterator();
             
-              while (locItr.hasNext()) {
+                while (locItr.hasNext()) {
               
-                MapOutputLocation loc = locItr.next();
+                  MapOutputLocation loc = locItr.next();
               
-                // Do not schedule fetches from OBSOLETE maps
-                if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
-                  locItr.remove();
-                  continue;
-                }
-
-                uniqueHosts.add(host);
-                scheduledCopies.add(loc);
-                locItr.remove();  // remove from knownOutputs
-                numInFlight++; numScheduled++;
+                  // Do not schedule fetches from OBSOLETE maps
+                  if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
+                    locItr.remove();
+                    continue;
+                  }
+
+                  uniqueHosts.add(host);
+                  scheduledCopies.add(loc);
+                  locItr.remove();  // remove from knownOutputs
+                  numInFlight++; numScheduled++;
 
-                break; //we have a map from this host
-              }
-   	
-              if (knownOutputsByLoc.size() == 0) {
-                mapLocations.remove(host);
-                hostsItr.remove();
+                  break; //we have a map from this host
+                }
               }
             }
             scheduledCopies.notifyAll();
           }
+
           if (numScheduled > 0 || logNow) {
             LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled +
                    " outputs (" + penaltyBox.size() +
@@ -1823,7 +1797,7 @@
                   ((penaltyBox.get(host) - currentTime)/1000) + " seconds.");
             }
           }
-          
+
           // if we have no copies in flight and we can't schedule anything
           // new, just wait for a bit
           try {
@@ -1869,7 +1843,7 @@
                                   + " at " +
                                   mbpsFormat.format(transferRate) +  " MB/s)");
                 
-              // Note successfull fetch for this mapId to invalidate
+              // Note successful fetch for this mapId to invalidate
               // (possibly) old fetch-failures
               fetchFailedMaps.remove(cr.getLocation().getTaskId());
             } else if (cr.isObsolete()) {
@@ -1974,6 +1948,15 @@
         }
         
         // all done, inform the copiers to exit
+        exitGetMapEvents= true;
+        try {
+          getMapEventsThread.join();
+          LOG.info("getMapsEventsThread joined.");
+        } catch (Throwable t) {
+          LOG.info("getMapsEventsThread threw an exception: " +
+              StringUtils.stringifyException(t));
+        }
+
         synchronized (copiers) {
           synchronized (scheduledCopies) {
             for (MapOutputCopier copier : copiers) {
@@ -2218,109 +2201,6 @@
       }
     }
     
-    /** 
-     * Queries the {@link TaskTracker} for a set of map-completion events from
-     * a given event ID.
-     * 
-     * @param fromEventId the first event ID we want to start from, this is
-     *                     modified by the call to this method
-     * @param mapLocations the hash map of map locations by host
-     * @param hostsList    the list that contains unique hosts having 
-     *                     map outputs, will be updated on the return 
-     *                     of this method
-     * @return the number of new map-completion events from the given event ID 
-     * @throws IOException
-     */  
-    private int getMapCompletionEvents(IntWritable fromEventId, 
-                        HashMap<String,List<MapOutputLocation>> mapLocations,
-                        List<String> hostsList)
-    throws IOException {
-      
-      long currentTime = System.currentTimeMillis();    
-      long pollTime = lastPollTime + MIN_POLL_INTERVAL;
-      int numNewMaps = 0;
-      while (currentTime < pollTime) {
-        try {
-          Thread.sleep(pollTime-currentTime);
-        } catch (InterruptedException ie) { } // IGNORE
-        currentTime = System.currentTimeMillis();
-      }
-      
-      MapTaskCompletionEventsUpdate update = 
-        umbilical.getMapCompletionEvents(reduceTask.getJobID(), 
-                                         fromEventId.get(), MAX_EVENTS_TO_FETCH,
-                                         reduceTask.getTaskID());
-      TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();
-        
-      // Check if the reset is required.
-      // Since there is no ordering of the task completion events at the 
-      // reducer, the only option to sync with the new jobtracker is to reset 
-      // the events index
-      if (update.shouldReset()) {
-        fromEventId.set(0);
-        obsoleteMapIds.clear(); // clear the obsolete map
-      }
-      
-      // Note the last successful poll time-stamp
-      lastPollTime = currentTime;
-
-      // Update the last seen event ID
-      fromEventId.set(fromEventId.get() + events.length);
-      
-      // Process the TaskCompletionEvents:
-      // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
-      // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop fetching
-      //    from those maps.
-      // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
-      //    outputs at all.
-      for (TaskCompletionEvent event : events) {
-        switch (event.getTaskStatus()) {
-          case SUCCEEDED:
-          {
-            URI u = URI.create(event.getTaskTrackerHttp());
-            String host = u.getHost();
-            TaskAttemptID taskId = event.getTaskAttemptId();
-            int duration = event.getTaskRunTime();
-            if (duration > maxMapRuntime) {
-              maxMapRuntime = duration; 
-              // adjust max-fetch-retries based on max-map-run-time
-              maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP, 
-                getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1));
-            }
-            URL mapOutputLocation = new URL(event.getTaskTrackerHttp() + 
-                                    "/mapOutput?job=" + taskId.getJobID() +
-                                    "&map=" + taskId + 
-                                    "&reduce=" + getPartition());
-            List<MapOutputLocation> loc = mapLocations.get(host);
-            if (loc == null) {
-              loc = new LinkedList<MapOutputLocation>();
-              mapLocations.put(host, loc);
-              hostsList.add(host);
-             }
-            loc.add(new MapOutputLocation(taskId, host, mapOutputLocation));
-            numNewMaps ++;
-          }
-          break;
-          case FAILED:
-          case KILLED:
-          case OBSOLETE:
-          {
-            obsoleteMapIds.add(event.getTaskAttemptId());
-            LOG.info("Ignoring obsolete output of " + event.getTaskStatus() + 
-                     " map-task: '" + event.getTaskAttemptId() + "'");
-          }
-          break;
-          case TIPFAILED:
-          {
-            copiedMapOutputs.add(event.getTaskAttemptId().getTaskID());
-            LOG.info("Ignoring output of failed map TIP: '" +  
-            		 event.getTaskAttemptId() + "'");
-          }
-          break;
-        }
-      }
-      return numNewMaps;
-    }
     
     
     /** Starts merging the local copy (on disk) of the map's output so that
@@ -2552,6 +2432,130 @@
       }
     }
 
+    private class GetMapEventsThread extends Thread {
+      
+      private IntWritable fromEventId = new IntWritable(0);
+      private static final long SLEEP_TIME = 1000;
+      
+      public GetMapEventsThread() {
+        setName("Thread for polling Map Completion Events");
+        setDaemon(true);
+      }
+      
+      @Override
+      public void run() {
+      
+        LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
+        
+        do {
+          try {
+            int numNewMaps = getMapCompletionEvents();
+            if (numNewMaps > 0) {
+              LOG.info(reduceTask.getTaskID() + ": " +  
+                  "Got " + numNewMaps + " new map-outputs"); 
+            }
+            Thread.sleep(SLEEP_TIME);
+          } 
+          catch (InterruptedException e) {
+            LOG.warn(reduceTask.getTaskID() +
+                " GetMapEventsThread returning after an " +
+                " interrupted exception");
+            return;
+          }
+          catch (Throwable t) {
+            LOG.warn(reduceTask.getTaskID() +
+                " GetMapEventsThread Ignoring exception : " +
+                StringUtils.stringifyException(t));
+          }
+        } while (!exitGetMapEvents);
+
+        LOG.info("GetMapEventsThread exiting");
+      
+      }
+      
+      /** 
+       * Queries the {@link TaskTracker} for a set of map-completion events 
+       * from a given event ID.
+       * @throws IOException
+       */  
+      private int getMapCompletionEvents() throws IOException {
+        
+        int numNewMaps = 0;
+        
+        MapTaskCompletionEventsUpdate update = 
+          umbilical.getMapCompletionEvents(reduceTask.getJobID(), 
+                                           fromEventId.get(), 
+                                           MAX_EVENTS_TO_FETCH,
+                                           reduceTask.getTaskID());
+        TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();
+          
+        // Check if the reset is required.
+        // Since there is no ordering of the task completion events at the 
+        // reducer, the only option to sync with the new jobtracker is to reset 
+        // the events index
+        if (update.shouldReset()) {
+          fromEventId.set(0);
+          obsoleteMapIds.clear(); // clear the obsolete map
+        }
+        
+        // Update the last seen event ID
+        fromEventId.set(fromEventId.get() + events.length);
+        
+        // Process the TaskCompletionEvents:
+        // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
+        // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop 
+        //    fetching from those maps.
+        // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
+        //    outputs at all.
+        for (TaskCompletionEvent event : events) {
+          switch (event.getTaskStatus()) {
+            case SUCCEEDED:
+            {
+              URI u = URI.create(event.getTaskTrackerHttp());
+              String host = u.getHost();
+              TaskAttemptID taskId = event.getTaskAttemptId();
+              int duration = event.getTaskRunTime();
+              if (duration > maxMapRuntime) {
+                maxMapRuntime = duration; 
+                // adjust max-fetch-retries based on max-map-run-time
+                maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP, 
+                  getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1));
+              }
+              URL mapOutputLocation = new URL(event.getTaskTrackerHttp() + 
+                                      "/mapOutput?job=" + taskId.getJobID() +
+                                      "&map=" + taskId + 
+                                      "&reduce=" + getPartition());
+              List<MapOutputLocation> loc = mapLocations.get(host);
+              if (loc == null) {
+                loc = Collections.synchronizedList
+                  (new LinkedList<MapOutputLocation>());
+                mapLocations.put(host, loc);
+               }
+              loc.add(new MapOutputLocation(taskId, host, mapOutputLocation));
+              numNewMaps ++;
+            }
+            break;
+            case FAILED:
+            case KILLED:
+            case OBSOLETE:
+            {
+              obsoleteMapIds.add(event.getTaskAttemptId());
+              LOG.info("Ignoring obsolete output of " + event.getTaskStatus() + 
+                       " map-task: '" + event.getTaskAttemptId() + "'");
+            }
+            break;
+            case TIPFAILED:
+            {
+              copiedMapOutputs.add(event.getTaskAttemptId().getTaskID());
+              LOG.info("Ignoring output of failed map TIP: '" +  
+                   event.getTaskAttemptId() + "'");
+            }
+            break;
+          }
+        }
+        return numNewMaps;
+      }
+    }
   }
 
   /**