You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/03/12 17:55:27 UTC
svn commit: r752932 - in /hadoop/core/trunk: ./
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/
src/core/org/apache/hadoop/net/ src/mapred/org/apache/hadoop/mapred/
src/test/org/apache/hadoop/mapred/
Author: ddas
Date: Thu Mar 12 16:55:26 2009
New Revision: 752932
URL: http://svn.apache.org/viewvc?rev=752932&view=rev
Log:
HADOOP-4664. Introduces multiple job initialization threads, where the number of threads are configurable via mapred.jobinit.threads. Contributed by (Matei Zaharia and Jothi Padmanabhan.
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
hadoop/core/trunk/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=752932&r1=752931&r2=752932&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Mar 12 16:55:26 2009
@@ -626,6 +626,10 @@
HADOOP-5248. A testcase that checks for the existence of job directory
after the job completes. Fails if it exists. (ddas)
+ HADOOP-4664. Introduces multiple job initialization threads, where the
+ number of threads are configurable via mapred.jobinit.threads.
+ (Matei Zaharia and Jothi Padmanabhan via ddas)
+
OPTIMIZATIONS
HADOOP-3293. Fixes FileInputFormat to do provide locations for splits
Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=752932&r1=752931&r2=752932&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Thu Mar 12 16:55:26 2009
@@ -105,7 +105,6 @@
protected FairScheduler(Clock clock, boolean runBackgroundUpdates) {
this.clock = clock;
this.runBackgroundUpdates = runBackgroundUpdates;
- this.eagerInitListener = new EagerTaskInitializationListener();
this.jobListener = new JobListener();
}
@@ -113,6 +112,7 @@
public void start() {
try {
Configuration conf = getConf();
+ this.eagerInitListener = new EagerTaskInitializationListener(conf);
eagerInitListener.start();
taskTrackerManager.addJobInProgressListener(eagerInitListener);
taskTrackerManager.addJobInProgressListener(jobListener);
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java?rev=752932&r1=752931&r2=752932&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java Thu Mar 12 16:55:26 2009
@@ -20,7 +20,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
/**
* A cached implementation of DNSToSwitchMapping that takes an
@@ -30,7 +30,7 @@
*
*/
public class CachedDNSToSwitchMapping implements DNSToSwitchMapping {
- private Map<String, String> cache = new TreeMap<String, String>();
+ private Map<String, String> cache = new ConcurrentHashMap<String, String>();
protected DNSToSwitchMapping rawMapping;
public CachedDNSToSwitchMapping(DNSToSwitchMapping rawMapping) {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java?rev=752932&r1=752931&r2=752932&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java Thu Mar 12 16:55:26 2009
@@ -22,9 +22,12 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.util.StringUtils;
@@ -34,53 +37,88 @@
*/
class EagerTaskInitializationListener extends JobInProgressListener {
+ private static final int DEFAULT_NUM_THREADS = 4;
private static final Log LOG = LogFactory.getLog(
EagerTaskInitializationListener.class.getName());
/////////////////////////////////////////////////////////////////
// Used to init new jobs that have just been created
/////////////////////////////////////////////////////////////////
- class JobInitThread implements Runnable {
+ class JobInitManager implements Runnable {
+
public void run() {
- JobInProgress job;
+ JobInProgress job = null;
while (true) {
- job = null;
try {
synchronized (jobInitQueue) {
- while (jobInitQueue.isEmpty()) {
+ while (jobInitQueue.isEmpty() && !exitFlag) {
jobInitQueue.wait();
}
- job = jobInitQueue.remove(0);
+ if (exitFlag) {
+ break;
+ }
}
- job.initTasks();
+ job = jobInitQueue.remove(0);
+ threadPool.execute(new InitJob(job));
} catch (InterruptedException t) {
+ LOG.info("JobInitManagerThread interrupted.");
break;
- } catch (Throwable t) {
- LOG.error("Job initialization failed:\n" +
- StringUtils.stringifyException(t));
- if (job != null) {
- job.fail();
- }
+ }
+ }
+ LOG.info("Shutting down thread pool");
+ threadPool.shutdownNow();
+ }
+ }
+
+ static class InitJob implements Runnable {
+
+ private JobInProgress job;
+
+ public InitJob(JobInProgress job) {
+ this.job = job;
+ }
+
+ public void run() {
+ try {
+ LOG.info("Initializing " + job.getJobID());
+ job.initTasks();
+ } catch (Throwable t) {
+ LOG.error("Job initialization failed:\n" +
+ StringUtils.stringifyException(t));
+ if (job != null) {
+ job.fail();
}
}
}
}
- private JobInitThread initJobs = new JobInitThread();
- private Thread initJobsThread;
+ private JobInitManager jobInitManager = new JobInitManager();
+ private Thread jobInitManagerThread;
private List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>();
+ private ExecutorService threadPool;
+ private int numThreads;
+ private boolean exitFlag = false;
+
+ public EagerTaskInitializationListener(Configuration conf) {
+ numThreads = conf.getInt("mapred.jobinit.threads", DEFAULT_NUM_THREADS);
+ threadPool = Executors.newFixedThreadPool(numThreads);
+ }
public void start() throws IOException {
- this.initJobsThread = new Thread(initJobs, "initJobs");
- this.initJobsThread.start();
+ this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
+ jobInitManagerThread.setDaemon(true);
+ this.jobInitManagerThread.start();
}
public void terminate() throws IOException {
- if (this.initJobsThread != null && this.initJobsThread.isAlive()) {
- LOG.info("Stopping initer");
- this.initJobsThread.interrupt();
+ if (jobInitManagerThread != null && jobInitManagerThread.isAlive()) {
+ LOG.info("Stopping Job Init Manager thread");
+ synchronized (jobInitQueue) {
+ exitFlag = true;
+ jobInitQueue.notify();
+ }
try {
- this.initJobsThread.join();
+ jobInitManagerThread.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=752932&r1=752931&r2=752932&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java Thu Mar 12 16:55:26 2009
@@ -32,6 +32,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -92,7 +93,7 @@
private static String JOBTRACKER_UNIQUE_STRING = null;
private static String LOG_DIR = null;
private static Map<String, ArrayList<PrintWriter>> openJobs =
- new HashMap<String, ArrayList<PrintWriter>>();
+ new ConcurrentHashMap<String, ArrayList<PrintWriter>>();
private static boolean disableHistory = false;
private static final String SECONDARY_FILE_SUFFIX = ".recover";
private static long jobHistoryBlockSize = 0;
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=752932&r1=752931&r2=752932&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Thu Mar 12 16:55:26 2009
@@ -41,8 +41,6 @@
public JobQueueTaskScheduler() {
this.jobQueueJobInProgressListener = new JobQueueJobInProgressListener();
- this.eagerTaskInitializationListener =
- new EagerTaskInitializationListener();
}
@Override
@@ -74,6 +72,8 @@
super.setConf(conf);
padFraction = conf.getFloat("mapred.jobtracker.taskalloc.capacitypad",
0.01f);
+ this.eagerTaskInitializationListener =
+ new EagerTaskInitializationListener(conf);
}
@Override
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=752932&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java Thu Mar 12 16:55:26 2009
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.IntWritable;
+
+public class TestParallelInitialization extends TestCase {
+
+ private static int jobCounter;
+ private static final int NUM_JOBS = 3;
+ IntWritable numJobsCompleted = new IntWritable();
+
+ static void resetCounters() {
+ jobCounter = 0;
+ }
+
+ class FakeJobInProgress extends JobInProgress {
+
+ public FakeJobInProgress(JobConf jobConf,
+ FakeTaskTrackerManager taskTrackerManager) throws IOException {
+ super(new JobID("test", ++jobCounter), jobConf);
+ this.startTime = System.currentTimeMillis();
+ this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP);
+ this.status.setJobPriority(JobPriority.NORMAL);
+ this.status.setStartTime(startTime);
+ }
+
+ @Override
+ public synchronized void initTasks() throws IOException {
+ try {
+ int jobNumber = this.getJobID().getId();
+ synchronized (numJobsCompleted) {
+ while (numJobsCompleted.get() != (NUM_JOBS - jobNumber)) {
+ numJobsCompleted.wait();
+ }
+ numJobsCompleted.set(numJobsCompleted.get() + 1);
+ numJobsCompleted.notifyAll();
+ LOG.info("JobNumber " + jobNumber + " succeeded");
+ }
+ } catch (InterruptedException ie) {};
+ this.status.setRunState(JobStatus.SUCCEEDED);
+ }
+
+ @Override
+ synchronized void fail() {
+ this.status.setRunState(JobStatus.FAILED);
+ }
+ }
+
+ static class FakeTaskTrackerManager implements TaskTrackerManager {
+
+ int maps = 0;
+ int reduces = 0;
+ int maxMapTasksPerTracker = 2;
+ int maxReduceTasksPerTracker = 2;
+ List<JobInProgressListener> listeners =
+ new ArrayList<JobInProgressListener>();
+ QueueManager queueManager;
+
+ private Map<String, TaskTrackerStatus> trackers =
+ new HashMap<String, TaskTrackerStatus>();
+
+ public FakeTaskTrackerManager() {
+ JobConf conf = new JobConf();
+ queueManager = new QueueManager(conf);
+ trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
+ new ArrayList<TaskStatus>(), 0,
+ maxMapTasksPerTracker, maxReduceTasksPerTracker));
+ }
+
+ public ClusterStatus getClusterStatus() {
+ int numTrackers = trackers.size();
+ return new ClusterStatus(numTrackers, 0,
+ JobTracker.TASKTRACKER_EXPIRY_INTERVAL,
+ maps, reduces,
+ numTrackers * maxMapTasksPerTracker,
+ numTrackers * maxReduceTasksPerTracker,
+ JobTracker.State.RUNNING);
+ }
+
+ public int getNumberOfUniqueHosts() {
+ return 0;
+ }
+
+ public Collection<TaskTrackerStatus> taskTrackers() {
+ return trackers.values();
+ }
+
+ public void addJobInProgressListener(JobInProgressListener listener) {
+ listeners.add(listener);
+ }
+
+ public void removeJobInProgressListener(JobInProgressListener listener) {
+ listeners.remove(listener);
+ }
+
+
+ public QueueManager getQueueManager() {
+ return queueManager;
+ }
+
+ public int getNextHeartbeatInterval() {
+ return MRConstants.HEARTBEAT_INTERVAL_MIN;
+ }
+
+ public void killJob(JobID jobid) {
+ return;
+ }
+
+ public JobInProgress getJob(JobID jobid) {
+ return null;
+ }
+
+ // Test methods
+
+ public void submitJob(JobInProgress job) throws IOException {
+ for (JobInProgressListener listener : listeners) {
+ listener.jobAdded(job);
+ }
+ }
+ }
+
+ protected JobConf jobConf;
+ protected TaskScheduler scheduler;
+ private FakeTaskTrackerManager taskTrackerManager;
+
+ @Override
+ protected void setUp() throws Exception {
+ resetCounters();
+ jobConf = new JobConf();
+ taskTrackerManager = new FakeTaskTrackerManager();
+ scheduler = createTaskScheduler();
+ scheduler.setConf(jobConf);
+ scheduler.setTaskTrackerManager(taskTrackerManager);
+ scheduler.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ if (scheduler != null) {
+ scheduler.terminate();
+ }
+ }
+
+ protected TaskScheduler createTaskScheduler() {
+ return new JobQueueTaskScheduler();
+ }
+
+ public void testParallelInitJobs() throws IOException {
+ FakeJobInProgress[] jobs = new FakeJobInProgress[NUM_JOBS];
+
+ // Submit NUM_JOBS jobs in order. The init code will ensure
+ // that the jobs get inited in descending order of Job ids
+ // i.e. highest job id first and the smallest last.
+ // If we were not doing parallel init, the first submitted job
+ // will be inited first and that will hang
+
+ for (int i = 0; i < NUM_JOBS; i++) {
+ jobs[i] = new FakeJobInProgress(jobConf, taskTrackerManager);
+ jobs[i].getStatus().setRunState(JobStatus.PREP);
+ taskTrackerManager.submitJob(jobs[i]);
+ }
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {}
+
+ for (int i = 0; i < NUM_JOBS; i++) {
+ assertTrue(jobs[i].getStatus().getRunState() == JobStatus.SUCCEEDED);
+ }
+ }
+}