You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/03/12 17:58:49 UTC
svn commit: r752934 - in /hadoop/core/branches/branch-0.20: ./
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/
src/core/org/apache/hadoop/net/ src/mapred/org/apache/hadoop/mapred/
src/test/org/apache/hadoop/mapred/
Author: ddas
Date: Thu Mar 12 16:58:48 2009
New Revision: 752934
URL: http://svn.apache.org/viewvc?rev=752934&view=rev
Log:
Merge -r 752931:752932 from trunk onto 0.20 branch. Fixes HADOOP-4664.
Added:
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
- copied unchanged from r752932, hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
Modified:
hadoop/core/branches/branch-0.20/ (props changed)
hadoop/core/branches/branch-0.20/CHANGES.txt (contents, props changed)
hadoop/core/branches/branch-0.20/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobHistory.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
Propchange: hadoop/core/branches/branch-0.20/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 16:58:48 2009
@@ -1,2 +1,2 @@
/hadoop/core/branches/branch-0.19:713112
-/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746902-746903,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609,752834,752836,752913
+/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746902-746903,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609,752834,752836,752913,752932
Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=752934&r1=752933&r2=752934&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Thu Mar 12 16:58:48 2009
@@ -335,6 +335,10 @@
HADOOP-5248. A testcase that checks for the existence of job directory
after the job completes. Fails if it exists. (ddas)
+ HADOOP-4664. Introduces multiple job initialization threads, where the
+ number of threads are configurable via mapred.jobinit.threads.
+ (Matei Zaharia and Jothi Padmanabhan via ddas)
+
OPTIMIZATIONS
HADOOP-3293. Fixes FileInputFormat to do provide locations for splits
Propchange: hadoop/core/branches/branch-0.20/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 16:58:48 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.18/CHANGES.txt:727226
/hadoop/core/branches/branch-0.19/CHANGES.txt:713112
-/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752514,752555,752590,752609,752834,752836,752913
+/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752514,752555,752590,752609,752834,752836,752913,752932
Modified: hadoop/core/branches/branch-0.20/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=752934&r1=752933&r2=752934&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/core/branches/branch-0.20/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Thu Mar 12 16:58:48 2009
@@ -105,7 +105,6 @@
protected FairScheduler(Clock clock, boolean runBackgroundUpdates) {
this.clock = clock;
this.runBackgroundUpdates = runBackgroundUpdates;
- this.eagerInitListener = new EagerTaskInitializationListener();
this.jobListener = new JobListener();
}
@@ -113,6 +112,7 @@
public void start() {
try {
Configuration conf = getConf();
+ this.eagerInitListener = new EagerTaskInitializationListener(conf);
eagerInitListener.start();
taskTrackerManager.addJobInProgressListener(eagerInitListener);
taskTrackerManager.addJobInProgressListener(jobListener);
Modified: hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java?rev=752934&r1=752933&r2=752934&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java (original)
+++ hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java Thu Mar 12 16:58:48 2009
@@ -20,7 +20,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
/**
* A cached implementation of DNSToSwitchMapping that takes an
@@ -30,7 +30,7 @@
*
*/
public class CachedDNSToSwitchMapping implements DNSToSwitchMapping {
- private Map<String, String> cache = new TreeMap<String, String>();
+ private Map<String, String> cache = new ConcurrentHashMap<String, String>();
protected DNSToSwitchMapping rawMapping;
public CachedDNSToSwitchMapping(DNSToSwitchMapping rawMapping) {
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java?rev=752934&r1=752933&r2=752934&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java Thu Mar 12 16:58:48 2009
@@ -22,9 +22,12 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.util.StringUtils;
@@ -34,53 +37,88 @@
*/
class EagerTaskInitializationListener extends JobInProgressListener {
+ private static final int DEFAULT_NUM_THREADS = 4;
private static final Log LOG = LogFactory.getLog(
EagerTaskInitializationListener.class.getName());
/////////////////////////////////////////////////////////////////
// Used to init new jobs that have just been created
/////////////////////////////////////////////////////////////////
- class JobInitThread implements Runnable {
+ class JobInitManager implements Runnable {
+
public void run() {
- JobInProgress job;
+ JobInProgress job = null;
while (true) {
- job = null;
try {
synchronized (jobInitQueue) {
- while (jobInitQueue.isEmpty()) {
+ while (jobInitQueue.isEmpty() && !exitFlag) {
jobInitQueue.wait();
}
- job = jobInitQueue.remove(0);
+ if (exitFlag) {
+ break;
+ }
}
- job.initTasks();
+ job = jobInitQueue.remove(0);
+ threadPool.execute(new InitJob(job));
} catch (InterruptedException t) {
+ LOG.info("JobInitManagerThread interrupted.");
break;
- } catch (Throwable t) {
- LOG.error("Job initialization failed:\n" +
- StringUtils.stringifyException(t));
- if (job != null) {
- job.fail();
- }
+ }
+ }
+ LOG.info("Shutting down thread pool");
+ threadPool.shutdownNow();
+ }
+ }
+
+ static class InitJob implements Runnable {
+
+ private JobInProgress job;
+
+ public InitJob(JobInProgress job) {
+ this.job = job;
+ }
+
+ public void run() {
+ try {
+ LOG.info("Initializing " + job.getJobID());
+ job.initTasks();
+ } catch (Throwable t) {
+ LOG.error("Job initialization failed:\n" +
+ StringUtils.stringifyException(t));
+ if (job != null) {
+ job.fail();
}
}
}
}
- private JobInitThread initJobs = new JobInitThread();
- private Thread initJobsThread;
+ private JobInitManager jobInitManager = new JobInitManager();
+ private Thread jobInitManagerThread;
private List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>();
+ private ExecutorService threadPool;
+ private int numThreads;
+ private boolean exitFlag = false;
+
+ public EagerTaskInitializationListener(Configuration conf) {
+ numThreads = conf.getInt("mapred.jobinit.threads", DEFAULT_NUM_THREADS);
+ threadPool = Executors.newFixedThreadPool(numThreads);
+ }
public void start() throws IOException {
- this.initJobsThread = new Thread(initJobs, "initJobs");
- this.initJobsThread.start();
+ this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
+ jobInitManagerThread.setDaemon(true);
+ this.jobInitManagerThread.start();
}
public void terminate() throws IOException {
- if (this.initJobsThread != null && this.initJobsThread.isAlive()) {
- LOG.info("Stopping initer");
- this.initJobsThread.interrupt();
+ if (jobInitManagerThread != null && jobInitManagerThread.isAlive()) {
+ LOG.info("Stopping Job Init Manager thread");
+ synchronized (jobInitQueue) {
+ exitFlag = true;
+ jobInitQueue.notify();
+ }
try {
- this.initJobsThread.join();
+ jobInitManagerThread.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=752934&r1=752933&r2=752934&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobHistory.java Thu Mar 12 16:58:48 2009
@@ -32,6 +32,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -90,7 +91,7 @@
private static String JOBTRACKER_UNIQUE_STRING = null;
private static String LOG_DIR = null;
private static Map<String, ArrayList<PrintWriter>> openJobs =
- new HashMap<String, ArrayList<PrintWriter>>();
+ new ConcurrentHashMap<String, ArrayList<PrintWriter>>();
private static boolean disableHistory = false;
private static final String SECONDARY_FILE_SUFFIX = ".recover";
private static long jobHistoryBlockSize = 0;
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=752934&r1=752933&r2=752934&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Thu Mar 12 16:58:48 2009
@@ -41,8 +41,6 @@
public JobQueueTaskScheduler() {
this.jobQueueJobInProgressListener = new JobQueueJobInProgressListener();
- this.eagerTaskInitializationListener =
- new EagerTaskInitializationListener();
}
@Override
@@ -74,6 +72,8 @@
super.setConf(conf);
padFraction = conf.getFloat("mapred.jobtracker.taskalloc.capacitypad",
0.01f);
+ this.eagerTaskInitializationListener =
+ new EagerTaskInitializationListener(conf);
}
@Override