You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2011/08/01 23:28:27 UTC
svn commit: r1152938 - in /hadoop/common/branches/branch-0.20-security:
CHANGES.txt src/mapred/org/apache/hadoop/mapred/TaskTracker.java
src/test/org/apache/hadoop/mapred/TestTaskLauncherThreaded.java
Author: ddas
Date: Mon Aug 1 21:28:25 2011
New Revision: 1152938
URL: http://svn.apache.org/viewvc?rev=1152938&view=rev
Log:
MAPREDUCE-2705. Implements launch of multiple tasks concurrently. Contributed by Thomas Graves.
Added:
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskLauncherThreaded.java
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1152938&r1=1152937&r2=1152938&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Mon Aug 1 21:28:25 2011
@@ -209,6 +209,9 @@ Release 0.20.204.0 - unreleased
HADOOP-7459. Remove jdk-1.6.0 dependency check from rpm. (omalley)
+ MAPREDUCE-2705. Implements launch of multiple tasks concurrently.
+ (Thomas Graves via ddas)
+
Release 0.20.203.1 - Unreleased
HADOOP-7330. Fix MetricsSourceAdapter to use the value instead of the
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1152938&r1=1152937&r2=1152938&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Mon Aug 1 21:28:25 2011
@@ -1312,7 +1312,7 @@ public class TaskTracker implements MRCo
return localJobFile;
}
- private void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
+ protected void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
RunningJob rjob) throws IOException {
synchronized (tip) {
jobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
@@ -2292,40 +2292,45 @@ public class TaskTracker implements MRCo
}
return tip;
}
+
/**
* Start a new task.
* All exceptions are handled locally, so that we don't mess up the
* task tracker.
* @throws InterruptedException
*/
- void startNewTask(TaskInProgress tip) throws InterruptedException {
- try {
- RunningJob rjob = localizeJob(tip);
- tip.getTask().setJobFile(rjob.localizedJobConf.toString());
- // Localization is done. Neither rjob.jobConf nor rjob.ugi can be null
- launchTaskForJob(tip, new JobConf(rjob.jobConf), rjob);
- } catch (Throwable e) {
- String msg = ("Error initializing " + tip.getTask().getTaskID() +
- ":\n" + StringUtils.stringifyException(e));
- LOG.warn(msg);
- tip.reportDiagnosticInfo(msg);
- try {
- tip.kill(true);
- tip.cleanup(true);
- } catch (IOException ie2) {
- LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
- } catch (InterruptedException ie2) {
- LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
- }
-
- // Careful!
- // This might not be an 'Exception' - don't handle 'Error' here!
- if (e instanceof Error) {
- throw ((Error) e);
+ void startNewTask(final TaskInProgress tip) throws InterruptedException {
+ Thread launchThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ RunningJob rjob = localizeJob(tip);
+ tip.getTask().setJobFile(rjob.getLocalizedJobConf().toString());
+ // Localization is done. Neither rjob.jobConf nor rjob.ugi can be null
+ launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob);
+ } catch (Throwable e) {
+ String msg = ("Error initializing " + tip.getTask().getTaskID() +
+ ":\n" + StringUtils.stringifyException(e));
+ LOG.warn(msg);
+ tip.reportDiagnosticInfo(msg);
+ try {
+ tip.kill(true);
+ tip.cleanup(true);
+ } catch (IOException ie2) {
+ LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
+ } catch (InterruptedException ie2) {
+ LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
+ }
+ if (e instanceof Error) {
+ LOG.error("TaskLauncher error " +
+ StringUtils.stringifyException(e));
+ }
+ }
}
- }
+ });
+ launchThread.start();
}
-
+
void addToMemoryManager(TaskAttemptID attemptId, boolean isMap,
JobConf conf) {
if (isTaskMemoryManagerEnabled()) {
@@ -3508,6 +3513,10 @@ public class TaskTracker implements MRCo
JobConf getJobConf() {
return jobConf;
}
+
+ Path getLocalizedJobConf() {
+ return localizedJobConf;
+ }
}
/**
Added: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskLauncherThreaded.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskLauncherThreaded.java?rev=1152938&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskLauncherThreaded.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskLauncherThreaded.java Mon Aug 1 21:28:25 2011
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapred.TaskTracker.TaskLauncher;
+import org.apache.hadoop.mapred.TaskTracker.RunningJob;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.junit.Test;
+
+/**
+ * Tests {@link TaskLauncherThreaded}
+ */
+public class TestTaskLauncherThreaded {
+ private static int jobLocalizedCount = 0;
+ private static int jobLaunchCount = 0;
+ private static boolean quitWaiting = false;
+ private static boolean firstJobStarted = false;
+ private static boolean firstJobFinished = false;
+
+ private static class MyTaskTracker extends TaskTracker {
+
+ // stub out functions called from startNewTask
+ @Override
+ RunningJob localizeJob(TaskInProgress tip)
+ throws IOException, InterruptedException {
+ if (firstJobStarted == false) {
+ firstJobStarted = true;
+ while (quitWaiting == false) {
+ Thread.sleep(100);
+ }
+ firstJobFinished = true;
+ }
+ // mock out a RunningJob
+ RunningJob rjob = mock(RunningJob.class);
+ when(rjob.getLocalizedJobConf()).thenReturn(new Path("testing"));
+ when(rjob.getJobConf()).thenReturn(new JobConf());
+ jobLocalizedCount++;
+
+ return rjob;
+ }
+
+ @Override
+ protected void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
+ RunningJob rjob) throws IOException {
+ jobLaunchCount++;
+ }
+ }
+
+ /**
+ * Tests the case "task localizing doesn't block other tasks".
+ *
+ * Launches one task that simulates a task doing large localization,
+ * then starts a second task and verifies that second task is not
+ * blocked waiting behind the first task.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testLocalizationNotBlockingOtherTasks() throws IOException {
+ // setup a TaskTracker
+ JobConf ttConf = new JobConf();
+ ttConf.setInt("mapred.tasktracker.map.tasks.maximum", 4);
+ TaskTracker tt = new MyTaskTracker();
+
+ tt.runningJobs = new TreeMap<JobID, TaskTracker.RunningJob>();
+ tt.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
+ tt.setIndexCache(new IndexCache(ttConf));
+ tt.setTaskMemoryManagerEnabledFlag();
+
+ // start map-task launcher with four slots
+ TaskLauncher mapLauncher = tt.new TaskLauncher(TaskType.MAP, 4);
+ mapLauncher.start();
+
+ // launch a task which simulates large localization
+ String jtId = "test";
+ TaskAttemptID attemptID = new TaskAttemptID(jtId, 1, true, 0, 0);
+ Task task = new MapTask(null, attemptID, 0, null, 2);
+ mapLauncher.addToTaskQueue(new LaunchTaskAction(task));
+ // verify that task is added to runningTasks
+ TaskInProgress runningTip = tt.runningTasks.get(attemptID);
+ assertNotNull(runningTip);
+
+ // wait for a while for the first task to start initializing
+ // this loop waits at most for 30 seconds
+ for (int i = 0; i < 300; i++) {
+ if (firstJobStarted == true) {
+ break;
+ }
+ UtilsForTests.waitFor(100);
+ }
+
+ // Now start a second task and make sure it doesn't wait while first one initializes
+ String secondjtId = "test2";
+ TaskAttemptID secondAttemptID = new TaskAttemptID(secondjtId, 1, true, 0, 0);
+ Task secondTask = new MapTask(null, secondAttemptID, 0, null, 2);
+ mapLauncher.addToTaskQueue(new LaunchTaskAction(secondTask));
+ // verify that task is added to runningTasks
+ TaskInProgress secondRunningTip = tt.runningTasks.get(secondAttemptID);
+ assertNotNull(secondRunningTip);
+
+ // wait for a while for the second task to be launched
+ // this loop waits at most for 30 seconds
+ for (int i = 0; i < 300; i++) {
+ if (jobLaunchCount > 0) {
+ break;
+ }
+ UtilsForTests.waitFor(100);
+ }
+
+ assertEquals("Second task didn't run or both ran", 1, jobLocalizedCount);
+ assertEquals("second task didn't try to launch", 1, jobLaunchCount);
+ assertFalse("Second task didn't finish first task initializing", firstJobFinished);
+
+ // tell first task to stop waiting
+ quitWaiting = true;
+
+ // wait for a while for the first task finishes initializing
+ // this loop waits at most for 30 seconds
+ for (int i = 0; i < 300; i++) {
+ if (firstJobFinished == true) {
+ break;
+ }
+ UtilsForTests.waitFor(100);
+ }
+ assertTrue("First task didn't finish initializing", firstJobFinished);
+
+ // wait for a while for the first task finishes
+ // this loop waits at most for 30 seconds
+ for (int i = 0; i < 300; i++) {
+ if (jobLaunchCount > 1) {
+ break;
+ }
+ UtilsForTests.waitFor(100);
+ }
+ assertEquals("Both tasks didn't run", 2, jobLocalizedCount);
+ assertEquals("First task didn't try to launch", 2, jobLaunchCount);
+
+ }
+
+}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapred.TaskTracker.TaskLauncher;
+import org.apache.hadoop.mapred.TaskTracker.RunningJob;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.junit.Test;
+
+/**
+ * Tests {@link TaskLauncherThreaded}
+ */
+public class TestTaskLauncherThreaded {
+ private static int jobLocalizedCount = 0;
+ private static int jobLaunchCount = 0;
+ private static boolean quitWaiting = false;
+ private static boolean firstJobStarted = false;
+ private static boolean firstJobFinished = false;
+
+ private static class MyTaskTracker extends TaskTracker {
+
+ // stub out functions called from startNewTask
+ @Override
+ RunningJob localizeJob(TaskInProgress tip)
+ throws IOException, InterruptedException {
+ if (firstJobStarted == false) {
+ firstJobStarted = true;
+ while (quitWaiting == false) {
+ Thread.sleep(100);
+ }
+ firstJobFinished = true;
+ }
+ // mock out a RunningJob
+ RunningJob rjob = mock(RunningJob.class);
+ when(rjob.getLocalizedJobConf()).thenReturn(new Path("testing"));
+ when(rjob.getJobConf()).thenReturn(new JobConf());
+ jobLocalizedCount++;
+
+ return rjob;
+ }
+
+ @Override
+ protected void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
+ RunningJob rjob) throws IOException {
+ jobLaunchCount++;
+ }
+ }
+
+ /**
+ * Tests the case "task localizing doesn't block other tasks".
+ *
+ * Launches one task that simulates a task doing large localization,
+ * then starts a second task and verifies that second task is not
+ * blocked waiting behind the first task.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testLocalizationNotBlockingOtherTasks() throws IOException {
+ // setup a TaskTracker
+ JobConf ttConf = new JobConf();
+ ttConf.setInt("mapred.tasktracker.map.tasks.maximum", 4);
+ TaskTracker tt = new MyTaskTracker();
+
+ tt.runningJobs = new TreeMap<JobID, TaskTracker.RunningJob>();
+ tt.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
+ tt.setIndexCache(new IndexCache(ttConf));
+ tt.setTaskMemoryManagerEnabledFlag();
+
+ // start map-task launcher with four slots
+ TaskLauncher mapLauncher = tt.new TaskLauncher(TaskType.MAP, 4);
+ mapLauncher.start();
+
+ // launch a task which simulates large localization
+ String jtId = "test";
+ TaskAttemptID attemptID = new TaskAttemptID(jtId, 1, true, 0, 0);
+ Task task = new MapTask(null, attemptID, 0, null, 2);
+ mapLauncher.addToTaskQueue(new LaunchTaskAction(task));
+ // verify that task is added to runningTasks
+ TaskInProgress runningTip = tt.runningTasks.get(attemptID);
+ assertNotNull(runningTip);
+
+ // wait for a while for the first task to start initializing
+ // this loop waits at most for 30 seconds
+ for (int i = 0; i < 300; i++) {
+ if (firstJobStarted == true) {
+ break;
+ }
+ UtilsForTests.waitFor(100);
+ }
+
+ // Now start a second task and make sure it doesn't wait while first one initializes
+ String secondjtId = "test2";
+ TaskAttemptID secondAttemptID = new TaskAttemptID(secondjtId, 1, true, 0, 0);
+ Task secondTask = new MapTask(null, secondAttemptID, 0, null, 2);
+ mapLauncher.addToTaskQueue(new LaunchTaskAction(secondTask));
+ // verify that task is added to runningTasks
+ TaskInProgress secondRunningTip = tt.runningTasks.get(secondAttemptID);
+ assertNotNull(secondRunningTip);
+
+ // wait for a while for the second task to be launched
+ // this loop waits at most for 30 seconds
+ for (int i = 0; i < 300; i++) {
+ if (jobLaunchCount > 0) {
+ break;
+ }
+ UtilsForTests.waitFor(100);
+ }
+
+ assertEquals("Second task didn't run or both ran", 1, jobLocalizedCount);
+ assertEquals("second task didn't try to launch", 1, jobLaunchCount);
+ assertFalse("Second task didn't finish first task initializing", firstJobFinished);
+
+ // tell first task to stop waiting
+ quitWaiting = true;
+
+ // wait for a while for the first task finishes initializing
+ // this loop waits at most for 30 seconds
+ for (int i = 0; i < 300; i++) {
+ if (firstJobFinished == true) {
+ break;
+ }
+ UtilsForTests.waitFor(100);
+ }
+ assertTrue("First task didn't finish initializing", firstJobFinished);
+
+ // wait for a while for the first task finishes
+ // this loop waits at most for 30 seconds
+ for (int i = 0; i < 300; i++) {
+ if (jobLaunchCount > 1) {
+ break;
+ }
+ UtilsForTests.waitFor(100);
+ }
+ assertEquals("Both tasks didn't run", 2, jobLocalizedCount);
+ assertEquals("First task didn't try to launch", 2, jobLaunchCount);
+
+ }
+
+}