You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/03/12 17:55:27 UTC

svn commit: r752932 - in /hadoop/core/trunk: ./ src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/ src/core/org/apache/hadoop/net/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: ddas
Date: Thu Mar 12 16:55:26 2009
New Revision: 752932

URL: http://svn.apache.org/viewvc?rev=752932&view=rev
Log:
HADOOP-4664. Introduces multiple job initialization threads, where the number of threads are configurable via mapred.jobinit.threads. Contributed by (Matei Zaharia and Jothi Padmanabhan.

Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/core/trunk/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=752932&r1=752931&r2=752932&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Mar 12 16:55:26 2009
@@ -626,6 +626,10 @@
     HADOOP-5248. A testcase that checks for the existence of job directory
     after the job completes. Fails if it exists. (ddas)
 
+    HADOOP-4664. Introduces multiple job initialization threads, where the 
+    number of threads are configurable via mapred.jobinit.threads.
+    (Matei Zaharia and Jothi Padmanabhan via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-3293. Fixes FileInputFormat to do provide locations for splits

Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=752932&r1=752931&r2=752932&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Thu Mar 12 16:55:26 2009
@@ -105,7 +105,6 @@
   protected FairScheduler(Clock clock, boolean runBackgroundUpdates) {
     this.clock = clock;
     this.runBackgroundUpdates = runBackgroundUpdates;
-    this.eagerInitListener = new EagerTaskInitializationListener();
     this.jobListener = new JobListener();
   }
 
@@ -113,6 +112,7 @@
   public void start() {
     try {
       Configuration conf = getConf();
+      this.eagerInitListener = new EagerTaskInitializationListener(conf);
       eagerInitListener.start();
       taskTrackerManager.addJobInProgressListener(eagerInitListener);
       taskTrackerManager.addJobInProgressListener(jobListener);

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java?rev=752932&r1=752931&r2=752932&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java Thu Mar 12 16:55:26 2009
@@ -20,7 +20,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * A cached implementation of DNSToSwitchMapping that takes an
@@ -30,7 +30,7 @@
  *
  */
 public class CachedDNSToSwitchMapping implements DNSToSwitchMapping {
-  private Map<String, String> cache = new TreeMap<String, String>();
+  private Map<String, String> cache = new ConcurrentHashMap<String, String>();
   protected DNSToSwitchMapping rawMapping;
   
   public CachedDNSToSwitchMapping(DNSToSwitchMapping rawMapping) {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java?rev=752932&r1=752931&r2=752932&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java Thu Mar 12 16:55:26 2009
@@ -22,9 +22,12 @@
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.util.StringUtils;
 
@@ -34,53 +37,88 @@
  */
 class EagerTaskInitializationListener extends JobInProgressListener {
   
+  private static final int DEFAULT_NUM_THREADS = 4;
   private static final Log LOG = LogFactory.getLog(
       EagerTaskInitializationListener.class.getName());
   
   /////////////////////////////////////////////////////////////////
   //  Used to init new jobs that have just been created
   /////////////////////////////////////////////////////////////////
-  class JobInitThread implements Runnable {
+  class JobInitManager implements Runnable {
+   
     public void run() {
-      JobInProgress job;
+      JobInProgress job = null;
       while (true) {
-        job = null;
         try {
           synchronized (jobInitQueue) {
-            while (jobInitQueue.isEmpty()) {
+            while (jobInitQueue.isEmpty() && !exitFlag) {
               jobInitQueue.wait();
             }
-            job = jobInitQueue.remove(0);
+            if (exitFlag) {
+              break;
+            }
           }
-          job.initTasks();
+          job = jobInitQueue.remove(0);
+          threadPool.execute(new InitJob(job));
         } catch (InterruptedException t) {
+          LOG.info("JobInitManagerThread interrupted.");
           break;
-        } catch (Throwable t) {
-          LOG.error("Job initialization failed:\n" +
-                    StringUtils.stringifyException(t));
-          if (job != null) {
-            job.fail();
-          }
+        } 
+      }
+      LOG.info("Shutting down thread pool");
+      threadPool.shutdownNow();
+    }
+  }
+  
+  static class InitJob implements Runnable {
+  
+    private JobInProgress job;
+    
+    public InitJob(JobInProgress job) {
+      this.job = job;
+    }
+    
+    public void run() {
+      try {
+        LOG.info("Initializing " + job.getJobID());
+        job.initTasks();
+      } catch (Throwable t) {
+        LOG.error("Job initialization failed:\n" +
+            StringUtils.stringifyException(t));
+        if (job != null) {
+          job.fail();
         }
       }
     }
   }
   
-  private JobInitThread initJobs = new JobInitThread();
-  private Thread initJobsThread;
+  private JobInitManager jobInitManager = new JobInitManager();
+  private Thread jobInitManagerThread;
   private List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>();
+  private ExecutorService threadPool;
+  private int numThreads;
+  private boolean exitFlag = false;
+  
+  public EagerTaskInitializationListener(Configuration conf) {
+    numThreads = conf.getInt("mapred.jobinit.threads", DEFAULT_NUM_THREADS);
+    threadPool = Executors.newFixedThreadPool(numThreads);
+  }
   
   public void start() throws IOException {
-    this.initJobsThread = new Thread(initJobs, "initJobs");
-    this.initJobsThread.start();
+    this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
+    jobInitManagerThread.setDaemon(true);
+    this.jobInitManagerThread.start();
   }
   
   public void terminate() throws IOException {
-    if (this.initJobsThread != null && this.initJobsThread.isAlive()) {
-      LOG.info("Stopping initer");
-      this.initJobsThread.interrupt();
+    if (jobInitManagerThread != null && jobInitManagerThread.isAlive()) {
+      LOG.info("Stopping Job Init Manager thread");
+      synchronized (jobInitQueue) {
+        exitFlag = true;
+        jobInitQueue.notify();
+      }
       try {
-        this.initJobsThread.join();
+        jobInitManagerThread.join();
       } catch (InterruptedException ex) {
         ex.printStackTrace();
       }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=752932&r1=752931&r2=752932&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java Thu Mar 12 16:55:26 2009
@@ -32,6 +32,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -92,7 +93,7 @@
   private static String JOBTRACKER_UNIQUE_STRING = null;
   private static String LOG_DIR = null;
   private static Map<String, ArrayList<PrintWriter>> openJobs = 
-                     new HashMap<String, ArrayList<PrintWriter>>();
+                     new ConcurrentHashMap<String, ArrayList<PrintWriter>>();
   private static boolean disableHistory = false; 
   private static final String SECONDARY_FILE_SUFFIX = ".recover";
   private static long jobHistoryBlockSize = 0;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=752932&r1=752931&r2=752932&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Thu Mar 12 16:55:26 2009
@@ -41,8 +41,6 @@
   
   public JobQueueTaskScheduler() {
     this.jobQueueJobInProgressListener = new JobQueueJobInProgressListener();
-    this.eagerTaskInitializationListener =
-      new EagerTaskInitializationListener();
   }
   
   @Override
@@ -74,6 +72,8 @@
     super.setConf(conf);
     padFraction = conf.getFloat("mapred.jobtracker.taskalloc.capacitypad", 
                                  0.01f);
+    this.eagerTaskInitializationListener =
+      new EagerTaskInitializationListener(conf);
   }
 
   @Override

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=752932&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java Thu Mar 12 16:55:26 2009
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.IntWritable;
+
+public class TestParallelInitialization extends TestCase {
+  
+  private static int jobCounter;
+  private static final int NUM_JOBS = 3;
+  IntWritable numJobsCompleted = new IntWritable();
+  
+  static void resetCounters() {
+    jobCounter = 0;
+  }
+  
+  class FakeJobInProgress extends JobInProgress {
+   
+    public FakeJobInProgress(JobConf jobConf,
+        FakeTaskTrackerManager taskTrackerManager) throws IOException {
+      super(new JobID("test", ++jobCounter), jobConf);
+      this.startTime = System.currentTimeMillis();
+      this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP);
+      this.status.setJobPriority(JobPriority.NORMAL);
+      this.status.setStartTime(startTime);
+    }
+
+    @Override
+    public synchronized void initTasks() throws IOException {
+      try {
+        int jobNumber = this.getJobID().getId();
+        synchronized (numJobsCompleted) {
+          while (numJobsCompleted.get() != (NUM_JOBS - jobNumber)) {
+            numJobsCompleted.wait();
+          }
+          numJobsCompleted.set(numJobsCompleted.get() + 1);
+          numJobsCompleted.notifyAll();
+          LOG.info("JobNumber " + jobNumber + " succeeded");
+        }
+      } catch (InterruptedException ie) {};
+      this.status.setRunState(JobStatus.SUCCEEDED);
+    }
+
+    @Override
+    synchronized void fail() {
+      this.status.setRunState(JobStatus.FAILED);
+    }
+  }
+  
+  static class FakeTaskTrackerManager implements TaskTrackerManager {
+    
+    int maps = 0;
+    int reduces = 0;
+    int maxMapTasksPerTracker = 2;
+    int maxReduceTasksPerTracker = 2;
+    List<JobInProgressListener> listeners =
+      new ArrayList<JobInProgressListener>();
+    QueueManager queueManager;
+    
+    private Map<String, TaskTrackerStatus> trackers =
+      new HashMap<String, TaskTrackerStatus>();
+
+    public FakeTaskTrackerManager() {
+      JobConf conf = new JobConf();
+      queueManager = new QueueManager(conf);
+      trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
+                   new ArrayList<TaskStatus>(), 0,
+                   maxMapTasksPerTracker, maxReduceTasksPerTracker));
+    }
+    
+    public ClusterStatus getClusterStatus() {
+      int numTrackers = trackers.size();
+      return new ClusterStatus(numTrackers, 0, 
+                               JobTracker.TASKTRACKER_EXPIRY_INTERVAL,
+                               maps, reduces,
+                               numTrackers * maxMapTasksPerTracker,
+                               numTrackers * maxReduceTasksPerTracker,
+                               JobTracker.State.RUNNING);
+    }
+    
+    public int getNumberOfUniqueHosts() {
+      return 0;
+    }
+
+    public Collection<TaskTrackerStatus> taskTrackers() {
+      return trackers.values();
+    }
+
+    public void addJobInProgressListener(JobInProgressListener listener) {
+      listeners.add(listener);
+    }
+
+    public void removeJobInProgressListener(JobInProgressListener listener) {
+      listeners.remove(listener);
+    }
+    
+    
+    public QueueManager getQueueManager() {
+      return queueManager;
+    }
+    
+    public int getNextHeartbeatInterval() {
+      return MRConstants.HEARTBEAT_INTERVAL_MIN;
+    }
+
+    public void killJob(JobID jobid) {
+      return;
+    }
+
+    public JobInProgress getJob(JobID jobid) {
+      return null;
+    }
+
+    // Test methods
+    
+    public void submitJob(JobInProgress job) throws IOException {
+      for (JobInProgressListener listener : listeners) {
+        listener.jobAdded(job);
+      }
+    }
+  }
+  
+  protected JobConf jobConf;
+  protected TaskScheduler scheduler;
+  private FakeTaskTrackerManager taskTrackerManager;
+
+  @Override
+  protected void setUp() throws Exception {
+    resetCounters();
+    jobConf = new JobConf();
+    taskTrackerManager = new FakeTaskTrackerManager();
+    scheduler = createTaskScheduler();
+    scheduler.setConf(jobConf);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    scheduler.start();
+  }
+  
+  @Override
+  protected void tearDown() throws Exception {
+    if (scheduler != null) {
+      scheduler.terminate();
+    }
+  }
+  
+  protected TaskScheduler createTaskScheduler() {
+    return new JobQueueTaskScheduler();
+  }
+  
+  public void testParallelInitJobs() throws IOException {
+    FakeJobInProgress[] jobs = new FakeJobInProgress[NUM_JOBS];
+    
+    // Submit NUM_JOBS jobs in order. The init code will ensure
+    // that the jobs get inited in descending order of Job ids
+    // i.e. highest job id first and the smallest last.
+    // If we were not doing parallel init, the first submitted job
+    // will be inited first and that will hang
+    
+    for (int i = 0; i < NUM_JOBS; i++) {
+      jobs[i] = new FakeJobInProgress(jobConf, taskTrackerManager);
+      jobs[i].getStatus().setRunState(JobStatus.PREP);
+      taskTrackerManager.submitJob(jobs[i]);
+    }
+    
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ie) {}
+    
+    for (int i = 0; i < NUM_JOBS; i++) {
+      assertTrue(jobs[i].getStatus().getRunState() == JobStatus.SUCCEEDED);
+    }
+  }  
+}