You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2010/06/02 12:51:17 UTC
svn commit: r950485 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/TaskTracker.java
src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerSlotManagement.java
Author: vinodkv
Date: Wed Jun 2 10:51:16 2010
New Revision: 950485
URL: http://svn.apache.org/viewvc?rev=950485&view=rev
Log:
MAPREDUCE-913. TaskRunner crashes with NPE resulting in held up slots, UNINITIALIZED tasks and hung TaskTracker. Contributed by Amareshwari Sriramadasu and Sreekanth Ramakrishnan.
Added:
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerSlotManagement.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=950485&r1=950484&r2=950485&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Jun 2 10:51:16 2010
@@ -1638,3 +1638,7 @@ Release 0.21.0 - Unreleased
(Dick King and Amareshwari Sriramadasu via tomwhite)
MAPREDUCE-118. Fix Job.getJobID(). (Amareshwari Sriramadasu via sharad)
+
+ MAPREDUCE-913. TaskRunner crashes with NPE resulting in held up slots,
+ UNINITIALIZED tasks and hung TaskTracker. (Amareshwari Sriramadasu and
+ Sreekanth Ramakrishnan via vinodkv)
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=950485&r1=950484&r2=950485&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Jun 2 10:51:16 2010
@@ -2093,7 +2093,16 @@ public class TaskTracker
reduceLauncher.addToTaskQueue(action);
}
}
-
+
+ // This method is called from unit tests
+ int getFreeSlots(boolean isMap) {
+ if (isMap) {
+ return mapLauncher.numFreeSlots.get();
+ } else {
+ return reduceLauncher.numFreeSlots.get();
+ }
+ }
+
class TaskLauncher extends Thread {
private IntWritable numFreeSlots;
private final int maxSlots;
@@ -2657,8 +2666,11 @@ public class TaskTracker
*/
void reportTaskFinished(boolean commitPending) {
if (!commitPending) {
- taskFinished();
- releaseSlot();
+ try {
+ taskFinished();
+ } finally {
+ releaseSlot();
+ }
}
notifyTTAboutTaskCompletion();
}
@@ -2728,7 +2740,15 @@ public class TaskTracker
setTaskFailState(true);
// call the script here for the failed tasks.
if (debugCommand != null) {
- runDebugScript();
+ try {
+ runDebugScript();
+ } catch (Exception e) {
+ String msg =
+ "Debug-script could not be run successfully : "
+ + StringUtils.stringifyException(e);
+ LOG.warn(msg);
+ reportDiagnosticInfo(msg);
+ }
}
}
taskStatus.setProgress(0.0f);
@@ -2749,14 +2769,17 @@ public class TaskTracker
if (needCleanup) {
removeTaskFromJob(task.getJobID(), this);
}
- try {
- cleanup(needCleanup);
- } catch (IOException ie) {
- }
+ cleanup(needCleanup);
}
-
- private void runDebugScript() {
+
+ /**
+ * Run the debug-script now. Because debug-script can be user code, we use
+ * {@link TaskController} to execute the debug script.
+ *
+ * @throws IOException
+ */
+ private void runDebugScript() throws IOException {
String taskStdout ="";
String taskStderr ="";
String taskSyslog ="";
@@ -2774,23 +2797,14 @@ public class TaskTracker
taskSyslog = FileUtil
.makeShellPath(TaskLog.getRealTaskLogFileLocation(task.getTaskID(),
task.isTaskCleanupTask(), TaskLog.LogName.SYSLOG));
- } catch(IOException e){
- LOG.warn("Exception finding task's stdout/err/syslog files");
- }
- File workDir = null;
- try {
- workDir =
- new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getLocalTaskDir(task.getUser(), task
- .getJobID().toString(), task.getTaskID()
- .toString(), task.isTaskCleanupTask())
- + Path.SEPARATOR + MRConstants.WORKDIR,
- localJobConf).toString());
- } catch (IOException e) {
- LOG.warn("Working Directory of the task " + task.getTaskID() +
- " doesnt exist. Caught exception " +
- StringUtils.stringifyException(e));
+ } catch(Exception e){
+ LOG.warn("Exception finding task's stdout/err/syslog files", e);
}
+ File workDir = new File(lDirAlloc.getLocalPathToRead(
+ TaskTracker.getLocalTaskDir(task.getUser(), task.getJobID()
+ .toString(), task.getTaskID().toString(), task
+ .isTaskCleanupTask())
+ + Path.SEPARATOR + MRConstants.WORKDIR, localJobConf).toString());
// Build the command
File stdout = TaskLog.getTaskLogFile(task.getTaskID(), task
.isTaskCleanupTask(), TaskLog.LogName.DEBUGOUT);
@@ -2820,21 +2834,10 @@ public class TaskTracker
context.stdout = stdout;
context.workDir = workDir;
context.task = task;
- try {
- getTaskController().runDebugScript(context);
- // add all lines of debug out to diagnostics
- try {
- int num = localJobConf.getInt(MRJobConfig.TASK_DEBUGOUT_LINES,
- -1);
- addDiagnostics(FileUtil.makeShellPath(stdout),num,
- "DEBUG OUT");
- } catch(IOException ioe) {
- LOG.warn("Exception in add diagnostics!");
- }
- } catch (IOException ie) {
- LOG.warn("runDebugScript failed with: " + StringUtils.
- stringifyException(ie));
- }
+ getTaskController().runDebugScript(context);
+ // add the lines of debug out to diagnostics
+ int num = localJobConf.getInt(MRJobConfig.TASK_DEBUGOUT_LINES, -1);
+ addDiagnostics(FileUtil.makeShellPath(stdout), num, "DEBUG OUT");
}
/**
@@ -2998,7 +3001,7 @@ public class TaskTracker
* otherwise the current working directory of the task
* i.e. <taskid>/work is cleaned up.
*/
- void cleanup(boolean needCleanup) throws IOException {
+ void cleanup(boolean needCleanup) {
TaskAttemptID taskId = task.getTaskID();
LOG.debug("Cleaning up " + taskId);
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerSlotManagement.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerSlotManagement.java?rev=950485&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerSlotManagement.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerSlotManagement.java Wed Jun 2 10:51:16 2010
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Regression test for MAPREDUCE-913
+ */
+public class TestTaskTrackerSlotManagement {
+
+ private static final Path TEST_DIR = new Path(System.getProperty(
+ "test.build.data", "/tmp"), "tt_slots");
+ private static final String CACHE_FILE_PATH = new Path(TEST_DIR, "test.txt")
+ .toString();
+
+ /**
+ * Test-setup. Create the cache-file.
+ *
+ * @throws Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ new File(TEST_DIR.toString()).mkdirs();
+ File myFile = new File(CACHE_FILE_PATH);
+ myFile.createNewFile();
+ }
+
+ /**
+ * Test-cleanup. Remove the cache-file.
+ *
+ * @throws Exception
+ */
+ @After
+ public void tearDown() throws Exception {
+ File myFile = new File(CACHE_FILE_PATH);
+ myFile.delete();
+ new File(TEST_DIR.toString()).delete();
+ }
+
+ /**
+ * Test case to test addition of free slot when the job fails localization due
+ * to cache file being modified after the job has started running.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testFreeingOfTaskSlots() throws Exception {
+ // Start a cluster with no task tracker.
+ MiniMRCluster mrCluster = new MiniMRCluster(0, "file:///", 1);
+ Configuration conf = mrCluster.createJobConf();
+ Cluster cluster = new Cluster(conf);
+ // set the debug script so that TT tries to launch the debug
+ // script for failed tasks.
+ conf.set(JobContext.MAP_DEBUG_SCRIPT, "/bin/echo");
+ conf.set(JobContext.REDUCE_DEBUG_SCRIPT, "/bin/echo");
+ Job j = MapReduceTestUtil.createJob(conf, new Path(TEST_DIR, "in"),
+ new Path(TEST_DIR, "out"), 0, 0);
+ // Add the local filed created to the cache files of the job
+ j.addCacheFile(new URI(CACHE_FILE_PATH));
+ j.setMaxMapAttempts(1);
+ j.setMaxReduceAttempts(1);
+ // Submit the job and return immediately.
+ // Job submit now takes care setting the last
+ // modified time of the cache file.
+ j.submit();
+ // Look up the file and modify the modification time.
+ File myFile = new File(CACHE_FILE_PATH);
+ myFile.setLastModified(0L);
+ // Start up the task tracker after the time has been changed.
+ mrCluster.startTaskTracker(null, null, 0, 1);
+ // Now wait for the job to fail.
+ j.waitForCompletion(false);
+ Assert.assertFalse("Job successfully completed.", j.isSuccessful());
+
+ ClusterMetrics metrics = cluster.getClusterStatus();
+ // validate number of slots in JobTracker
+ Assert.assertEquals(0, metrics.getOccupiedMapSlots());
+ Assert.assertEquals(0, metrics.getOccupiedReduceSlots());
+
+ // validate number of slots in TaskTracker
+ TaskTracker tt = mrCluster.getTaskTrackerRunner(0).getTaskTracker();
+ Assert.assertEquals(metrics.getMapSlotCapacity(), tt.getFreeSlots(true));
+ Assert.assertEquals(metrics.getReduceSlotCapacity(), tt.getFreeSlots(false));
+
+ }
+}