You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by yh...@apache.org on 2009/07/20 10:44:33 UTC

svn commit: r795719 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobTracker.java src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java

Author: yhemanth
Date: Mon Jul 20 08:44:32 2009
New Revision: 795719

URL: http://svn.apache.org/viewvc?rev=795719&view=rev
Log:
MAPREDUCE-771. Fix scheduling of setup and cleanup tasks to use free slots instead of tasks for scheduling. Contributed by Hemanth Yamijala.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=795719&r1=795718&r2=795719&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Jul 20 08:44:32 2009
@@ -230,3 +230,6 @@
     MAPREDUCE-18. Puts some checks for cross checking whether a reduce
     task gets the correct shuffle data. (Ravi Gummadi via ddas)
 
+    MAPREDUCE-771. Fix scheduling of setup and cleanup tasks to use
+    free slots instead of tasks for scheduling. (yhemanth)
+

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=795719&r1=795718&r2=795719&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Jul 20 08:44:32 2009
@@ -3218,12 +3218,12 @@
   }
   
   // returns cleanup tasks first, then setup tasks.
-  private synchronized List<Task> getSetupAndCleanupTasks(
+  synchronized List<Task> getSetupAndCleanupTasks(
     TaskTrackerStatus taskTracker) throws IOException {
     int maxMapTasks = taskTracker.getMaxMapSlots();
     int maxReduceTasks = taskTracker.getMaxReduceSlots();
-    int numMaps = taskTracker.countMapTasks();
-    int numReduces = taskTracker.countReduceTasks();
+    int numMaps = taskTracker.countOccupiedMapSlots();
+    int numReduces = taskTracker.countOccupiedReduceSlots();
     int numTaskTrackers = getClusterStatus().getTaskTrackers();
     int numUniqueHosts = getNumberOfUniqueHosts();
 

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java?rev=795719&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java Mon Jul 20 08:44:32 2009
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+public class TestSetupTaskScheduling extends TestCase {
+
+  public static final Log LOG = 
+    LogFactory.getLog(TestSetupTaskScheduling.class);
+
+  static String[] trackers = new String[] { "tracker_tracker1:1000",
+    "tracker_tracker2:1000", "tracker_tracker3:1000" };
+  private static FakeJobTracker jobTracker;
+
+  /**
+   * Fake JobInProgress that can return a hardcoded setup or
+   * cleanup task depending on the slot type passed in.
+   */
+  static class FakeJobWithSetupTask 
+    extends FakeObjectUtilities.FakeJobInProgress {
+    
+    FakeJobWithSetupTask(JobConf jobConf, 
+                          JobTracker tracker) throws IOException {
+      super(jobConf, tracker);
+    }
+
+    /**
+     * Initialize tasks, including setup. 
+     */
+    @Override
+    public synchronized void initTasks() throws IOException {
+      super.initTasks();
+      JobClient.RawSplit emptySplit = new JobClient.RawSplit();
+      setup = new TaskInProgress[2];
+      setup[0] = new TaskInProgress(getJobID(), "test",  emptySplit,
+          jobtracker, getJobConf(), this, numMapTasks + 1, 1);
+      setup[1] = new TaskInProgress(getJobID(), "test", numMapTasks,
+          numReduceTasks + 1, jobtracker, getJobConf(), this, 1);
+    }
+    
+    /**
+     * Obtain a setup task on a map slot or reduce slot
+     * depending on what is free.
+     * 
+     * Every call to this will return either a map or reduce
+     * setup task. No check is done to see if the task is already
+     * returned
+     */
+    @Override
+    public Task obtainJobSetupTask(TaskTrackerStatus tts, 
+                                             int clusterSize, 
+                                             int numUniqueHosts,
+                                             boolean isMapSlot) 
+                                  throws IOException{
+      TaskInProgress tip = null;
+      if (isMapSlot) {
+        tip = setup[0];
+      } else {
+        tip = setup[1];
+      }
+      Task t = tip.getTaskToRun(tts.getHost());
+      t.setJobSetupTask();
+      return t;
+    }
+  }
+
+  public void setUp() throws Exception {
+    JobConf conf = new JobConf();
+    conf.set("mapred.job.tracker", "localhost:0");
+    conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+    jobTracker = new FakeJobTracker(conf, new Clock(), trackers);
+    for (String tracker : trackers) {
+      FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
+    }
+  }
+
+  // create a job for testing setup tasks and reservations
+  FakeJobInProgress createJob(boolean withSetup) throws IOException {
+    JobConf conf = new JobConf();
+    conf.setSpeculativeExecution(false);
+    conf.setNumMapTasks(2);
+    conf.setNumReduceTasks(2);
+    conf.set("mapred.max.reduce.failures.percent", ".70");
+    conf.set("mapred.max.map.failures.percent", ".70");
+    FakeJobInProgress job = null;
+    if (withSetup) {
+      job = new FakeJobWithSetupTask(conf, jobTracker);
+    } else {
+      conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
+      job = new FakeJobInProgress(conf, jobTracker);
+    }
+    job.setClusterSize(trackers.length);
+    job.initTasks();
+    return job;
+  }
+  
+  // create a new TaskStatus and add to a list of status objects
+  void addNewTaskStatus(FakeJobInProgress job,
+      boolean isMapTask, String tracker, List<TaskStatus> reports) 
+        throws IOException {
+    TaskAttemptID task = null;
+    TaskStatus status = null;
+    if (isMapTask) {
+      task = job.findMapTask(tracker);
+      status = new MapTaskStatus(task, 0.01f, 2,
+            TaskStatus.State.RUNNING, "", "", tracker,
+            TaskStatus.Phase.MAP, new Counters());
+    } else {
+      task = job.findReduceTask(tracker);
+      status = new ReduceTaskStatus(task, 0.01f, 2,
+            TaskStatus.State.RUNNING, "", "", tracker,
+            TaskStatus.Phase.REDUCE, new Counters());
+    }
+    reports.add(status);
+  }
+  
+  // create a TaskTrackerStatus
+  TaskTrackerStatus createTaskTrackerStatus(String tracker, 
+      List<TaskStatus> reports) {
+    TaskTrackerStatus ttStatus =
+      new TaskTrackerStatus(tracker, 
+          JobInProgress.convertTrackerNameToHostName(tracker),
+          0, reports, 0, 2, 2);
+    return ttStatus;
+  }
+
+  /**
+   * Test that a setup task can be run against a map slot
+   * if it is free.
+   * @throws IOException
+   */
+  public void testSetupTaskReturnedForFreeMapSlots() throws IOException {
+    // create a job with a setup task.
+    FakeJobInProgress job = createJob(true);
+    jobTracker.jobs.put(job.getJobID(), job);
+    
+    // create a status simulating a free tasktracker
+    List<TaskStatus> reports = new ArrayList<TaskStatus>();
+    TaskTrackerStatus ttStatus 
+      = createTaskTrackerStatus(trackers[2], reports);
+    
+    // verify that a setup task can be assigned to a map slot.
+    List<Task> tasks = jobTracker.getSetupAndCleanupTasks(ttStatus);
+    assertEquals(1, tasks.size());
+    assertTrue(tasks.get(0).isJobSetupTask());
+    assertTrue(tasks.get(0).isMapTask());
+    jobTracker.jobs.clear();
+  }
+
+  /**
+   * Test to check that map slots are counted when returning
+   * a setup task.
+   * @throws IOException
+   */
+  public void testMapSlotsCountedForSetup() throws IOException {
+    // create a job with a setup task.
+    FakeJobInProgress job = createJob(true);
+    jobTracker.jobs.put(job.getJobID(), job);
+    
+    // create another job for reservation
+    FakeJobInProgress job1 = createJob(false);
+    jobTracker.jobs.put(job1.getJobID(), job1);
+   
+    // create TT status for testing getSetupAndCleanupTasks
+    List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>();
+    addNewTaskStatus(job, true, trackers[0], taskStatuses);
+    TaskTrackerStatus ttStatus 
+      = createTaskTrackerStatus(trackers[0], taskStatuses);
+    
+    // test that there should be no map setup task returned.
+    List<Task> tasks = jobTracker.getSetupAndCleanupTasks(ttStatus);
+    assertEquals(1, tasks.size());
+    assertTrue(tasks.get(0).isJobSetupTask());
+    assertFalse(tasks.get(0).isMapTask());
+    jobTracker.jobs.clear();
+  }
+
+  /**
+   * Test to check that reduce slots are also counted when returning
+   * a setup task.
+   * @throws IOException
+   */
+  public void testReduceSlotsCountedForSetup() throws IOException {
+    // create a job with a setup task.
+    FakeJobInProgress job = createJob(true);
+    jobTracker.jobs.put(job.getJobID(), job);
+    
+    // create another job for reservation
+    FakeJobInProgress job1 = createJob(false);
+    jobTracker.jobs.put(job1.getJobID(), job1);
+    
+    // create TT status for testing getSetupAndCleanupTasks
+    List<TaskStatus> reports = new ArrayList<TaskStatus>();
+    // because free map slots are checked first in code,
+    // we fill up map slots also.
+    addNewTaskStatus(job1, true, trackers[1], reports);
+    addNewTaskStatus(job1, false, trackers[1], reports);
+    TaskTrackerStatus ttStatus 
+      = createTaskTrackerStatus(trackers[1], reports);
+
+    // test that there should be no setup task returned,
+    // as both map and reduce slots are occupied.
+    List<Task> tasks = jobTracker.getSetupAndCleanupTasks(ttStatus);
+    assertNull(tasks);
+    jobTracker.jobs.clear();
+  }
+}