You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/03/12 17:58:49 UTC

svn commit: r752934 - in /hadoop/core/branches/branch-0.20: ./ src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/ src/core/org/apache/hadoop/net/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: ddas
Date: Thu Mar 12 16:58:48 2009
New Revision: 752934

URL: http://svn.apache.org/viewvc?rev=752934&view=rev
Log:
Merge -r 752931:752932 from trunk onto 0.20 branch. Fixes HADOOP-4664.

Added:
    hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
      - copied unchanged from r752932, hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
Modified:
    hadoop/core/branches/branch-0.20/   (props changed)
    hadoop/core/branches/branch-0.20/CHANGES.txt   (contents, props changed)
    hadoop/core/branches/branch-0.20/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobHistory.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java

Propchange: hadoop/core/branches/branch-0.20/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 16:58:48 2009
@@ -1,2 +1,2 @@
 /hadoop/core/branches/branch-0.19:713112
-/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746902-746903,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609,752834,752836,752913
+/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746902-746903,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609,752834,752836,752913,752932

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=752934&r1=752933&r2=752934&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Thu Mar 12 16:58:48 2009
@@ -335,6 +335,10 @@
     HADOOP-5248. A testcase that checks for the existence of job directory
     after the job completes. Fails if it exists. (ddas)
 
+    HADOOP-4664. Introduces multiple job initialization threads, where the 
+    number of threads are configurable via mapred.jobinit.threads.
+    (Matei Zaharia and Jothi Padmanabhan via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-3293. Fixes FileInputFormat to do provide locations for splits

Propchange: hadoop/core/branches/branch-0.20/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 16:58:48 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.18/CHANGES.txt:727226
 /hadoop/core/branches/branch-0.19/CHANGES.txt:713112
-/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752514,752555,752590,752609,752834,752836,752913
+/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752514,752555,752590,752609,752834,752836,752913,752932

Modified: hadoop/core/branches/branch-0.20/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=752934&r1=752933&r2=752934&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/core/branches/branch-0.20/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Thu Mar 12 16:58:48 2009
@@ -105,7 +105,6 @@
   protected FairScheduler(Clock clock, boolean runBackgroundUpdates) {
     this.clock = clock;
     this.runBackgroundUpdates = runBackgroundUpdates;
-    this.eagerInitListener = new EagerTaskInitializationListener();
     this.jobListener = new JobListener();
   }
 
@@ -113,6 +112,7 @@
   public void start() {
     try {
       Configuration conf = getConf();
+      this.eagerInitListener = new EagerTaskInitializationListener(conf);
       eagerInitListener.start();
       taskTrackerManager.addJobInProgressListener(eagerInitListener);
       taskTrackerManager.addJobInProgressListener(jobListener);

Modified: hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java?rev=752934&r1=752933&r2=752934&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java (original)
+++ hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java Thu Mar 12 16:58:48 2009
@@ -20,7 +20,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * A cached implementation of DNSToSwitchMapping that takes an
@@ -30,7 +30,7 @@
  *
  */
 public class CachedDNSToSwitchMapping implements DNSToSwitchMapping {
-  private Map<String, String> cache = new TreeMap<String, String>();
+  private Map<String, String> cache = new ConcurrentHashMap<String, String>();
   protected DNSToSwitchMapping rawMapping;
   
   public CachedDNSToSwitchMapping(DNSToSwitchMapping rawMapping) {

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java?rev=752934&r1=752933&r2=752934&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java Thu Mar 12 16:58:48 2009
@@ -22,9 +22,12 @@
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.util.StringUtils;
 
@@ -34,53 +37,88 @@
  */
 class EagerTaskInitializationListener extends JobInProgressListener {
   
+  private static final int DEFAULT_NUM_THREADS = 4;
   private static final Log LOG = LogFactory.getLog(
       EagerTaskInitializationListener.class.getName());
   
   /////////////////////////////////////////////////////////////////
   //  Used to init new jobs that have just been created
   /////////////////////////////////////////////////////////////////
-  class JobInitThread implements Runnable {
+  class JobInitManager implements Runnable {
+   
     public void run() {
-      JobInProgress job;
+      JobInProgress job = null;
       while (true) {
-        job = null;
         try {
           synchronized (jobInitQueue) {
-            while (jobInitQueue.isEmpty()) {
+            while (jobInitQueue.isEmpty() && !exitFlag) {
               jobInitQueue.wait();
             }
-            job = jobInitQueue.remove(0);
+            if (exitFlag) {
+              break;
+            }
           }
-          job.initTasks();
+          job = jobInitQueue.remove(0);
+          threadPool.execute(new InitJob(job));
         } catch (InterruptedException t) {
+          LOG.info("JobInitManagerThread interrupted.");
           break;
-        } catch (Throwable t) {
-          LOG.error("Job initialization failed:\n" +
-                    StringUtils.stringifyException(t));
-          if (job != null) {
-            job.fail();
-          }
+        } 
+      }
+      LOG.info("Shutting down thread pool");
+      threadPool.shutdownNow();
+    }
+  }
+  
+  static class InitJob implements Runnable {
+  
+    private JobInProgress job;
+    
+    public InitJob(JobInProgress job) {
+      this.job = job;
+    }
+    
+    public void run() {
+      try {
+        LOG.info("Initializing " + job.getJobID());
+        job.initTasks();
+      } catch (Throwable t) {
+        LOG.error("Job initialization failed:\n" +
+            StringUtils.stringifyException(t));
+        if (job != null) {
+          job.fail();
         }
       }
     }
   }
   
-  private JobInitThread initJobs = new JobInitThread();
-  private Thread initJobsThread;
+  private JobInitManager jobInitManager = new JobInitManager();
+  private Thread jobInitManagerThread;
   private List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>();
+  private ExecutorService threadPool;
+  private int numThreads;
+  private boolean exitFlag = false;
+  
+  public EagerTaskInitializationListener(Configuration conf) {
+    numThreads = conf.getInt("mapred.jobinit.threads", DEFAULT_NUM_THREADS);
+    threadPool = Executors.newFixedThreadPool(numThreads);
+  }
   
   public void start() throws IOException {
-    this.initJobsThread = new Thread(initJobs, "initJobs");
-    this.initJobsThread.start();
+    this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
+    jobInitManagerThread.setDaemon(true);
+    this.jobInitManagerThread.start();
   }
   
   public void terminate() throws IOException {
-    if (this.initJobsThread != null && this.initJobsThread.isAlive()) {
-      LOG.info("Stopping initer");
-      this.initJobsThread.interrupt();
+    if (jobInitManagerThread != null && jobInitManagerThread.isAlive()) {
+      LOG.info("Stopping Job Init Manager thread");
+      synchronized (jobInitQueue) {
+        exitFlag = true;
+        jobInitQueue.notify();
+      }
       try {
-        this.initJobsThread.join();
+        jobInitManagerThread.join();
       } catch (InterruptedException ex) {
         ex.printStackTrace();
       }

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=752934&r1=752933&r2=752934&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobHistory.java Thu Mar 12 16:58:48 2009
@@ -32,6 +32,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -90,7 +91,7 @@
   private static String JOBTRACKER_UNIQUE_STRING = null;
   private static String LOG_DIR = null;
   private static Map<String, ArrayList<PrintWriter>> openJobs = 
-                     new HashMap<String, ArrayList<PrintWriter>>();
+                     new ConcurrentHashMap<String, ArrayList<PrintWriter>>();
   private static boolean disableHistory = false; 
   private static final String SECONDARY_FILE_SUFFIX = ".recover";
   private static long jobHistoryBlockSize = 0;

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=752934&r1=752933&r2=752934&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Thu Mar 12 16:58:48 2009
@@ -41,8 +41,6 @@
   
   public JobQueueTaskScheduler() {
     this.jobQueueJobInProgressListener = new JobQueueJobInProgressListener();
-    this.eagerTaskInitializationListener =
-      new EagerTaskInitializationListener();
   }
   
   @Override
@@ -74,6 +72,8 @@
     super.setConf(conf);
     padFraction = conf.getFloat("mapred.jobtracker.taskalloc.capacitypad", 
                                  0.01f);
+    this.eagerTaskInitializationListener =
+      new EagerTaskInitializationListener(conf);
   }
 
   @Override