You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2010/05/11 10:34:13 UTC
svn commit: r943039 - in /hadoop/mapreduce/trunk: ./ conf/
src/c++/task-controller/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/pipes/
src/test/mapred/org/apache/hadoo...
Author: vinodkv
Date: Tue May 11 08:34:12 2010
New Revision: 943039
URL: http://svn.apache.org/viewvc?rev=943039&view=rev
Log:
MAPREDUCE-1607. Task controller may not set permissions for a task cleanup attempt's log directory. Contributed by Amareshwari Sriramadasu.
Added:
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJvmReuse.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/conf/log4j.properties
hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=943039&r1=943038&r2=943039&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue May 11 08:34:12 2010
@@ -76,6 +76,9 @@ Release 0.21.0 - Unreleased
MAPREDUCE-1644. Remove Sqoop contrib module. (Aaron Kimball via cdouglas)
+ MAPREDUCE-1607. Task controller may not set permissions for a
+ task cleanup attempt's log directory (Amareshwari Sriramadasu via vinodkv)
+
NEW FEATURES
MAPREDUCE-706. Support for FIFO pools in the fair scheduler.
Modified: hadoop/mapreduce/trunk/conf/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/conf/log4j.properties?rev=943039&r1=943038&r2=943039&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/conf/log4j.properties (original)
+++ hadoop/mapreduce/trunk/conf/log4j.properties Tue May 11 08:34:12 2010
@@ -55,6 +55,7 @@ log4j.appender.console.layout.Conversion
#Default values
hadoop.tasklog.taskid=null
+hadoop.tasklog.iscleanup=false
hadoop.tasklog.noKeepSplits=4
hadoop.tasklog.totalLogFileSize=100
hadoop.tasklog.purgeLogSplits=true
@@ -62,6 +63,7 @@ hadoop.tasklog.logsRetainHours=12
log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.isCleanup=${hadoop.tasklog.iscleanup}
log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.c?rev=943039&r1=943038&r2=943039&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c Tue May 11 08:34:12 2010
@@ -583,10 +583,6 @@ int prepare_task_logs(const char *log_di
if (stat(task_log_dir, &filestat) != 0) {
if (errno == ENOENT) {
// See TaskRunner.java to see that an absent log-dir doesn't fail the task.
- // Task log dir for cleanup tasks will not have the name
- // task-attempt-id.cleanup. Instead a log.index.cleanup is created in
- // task-attempt log dir. We check if the directory exists and return if
- // it doesn't. So the following will work for cleanup attempts too.
#ifdef DEBUG
fprintf(LOGFILE, "task_log_dir %s doesn't exist. Not doing anything.\n",
task_log_dir);
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java?rev=943039&r1=943038&r2=943039&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java Tue May 11 08:34:12 2010
@@ -135,6 +135,7 @@ public class TestStreamingTaskLog {
long logSize = USERLOG_LIMIT_KB * 1024;
assertTrue("environment set for child is wrong", env.contains("INFO,TLA")
&& env.contains("-Dhadoop.tasklog.taskid=attempt_")
- && env.contains("-Dhadoop.tasklog.totalLogFileSize=" + logSize));
+ && env.contains("-Dhadoop.tasklog.totalLogFileSize=" + logSize)
+ && env.contains("-Dhadoop.tasklog.iscleanup=false"));
}
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java?rev=943039&r1=943038&r2=943039&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java Tue May 11 08:34:12 2010
@@ -69,8 +69,9 @@ class Child {
int port = Integer.parseInt(args[1]);
final InetSocketAddress address = new InetSocketAddress(host, port);
final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
+ final String logLocation = args[3];
final int SLEEP_LONGER_COUNT = 5;
- int jvmIdInt = Integer.parseInt(args[3]);
+ int jvmIdInt = Integer.parseInt(args[4]);
JVMId jvmId = new JVMId(firstTaskid.getJobID(),
firstTaskid.getTaskType() == TaskType.MAP,jvmIdInt);
@@ -109,7 +110,7 @@ class Child {
public void run() {
try {
if (taskid != null) {
- TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+ TaskLog.syncLogs(logLocation, taskid, isCleanup);
}
} catch (Throwable throwable) {
}
@@ -123,7 +124,7 @@ class Child {
try {
Thread.sleep(5000);
if (taskid != null) {
- TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+ TaskLog.syncLogs(logLocation, taskid, isCleanup);
}
} catch (InterruptedException ie) {
} catch (IOException iee) {
@@ -175,7 +176,7 @@ class Child {
//create the index file so that the log files
//are viewable immediately
- TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+ TaskLog.syncLogs(logLocation, taskid, isCleanup);
final JobConf job = new JobConf(task.getJobFile());
// set the jobTokenFile into task
@@ -215,7 +216,7 @@ class Child {
FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
taskFinal.run(job, umbilical); // run the task
} finally {
- TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+ TaskLog.syncLogs(logLocation, taskid, isCleanup);
}
return null;
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=943039&r1=943038&r2=943039&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Tue May 11 08:34:12 2010
@@ -72,19 +72,21 @@ public class TaskLog {
}
}
- public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
- return new File(getAttemptDir(taskid.toString()), filter.toString());
- }
- public static File getRealTaskLogFileLocation(TaskAttemptID taskid,
+ public static File getTaskLogFile(TaskAttemptID taskid, boolean isCleanup,
LogName filter) {
+ return new File(getAttemptDir(taskid, isCleanup), filter.toString());
+ }
+
+ static File getRealTaskLogFileLocation(TaskAttemptID taskid,
+ boolean isCleanup, LogName filter) {
LogFileDetail l;
try {
- l = getTaskLogFileDetail(taskid, filter);
+ l = getLogFileDetail(taskid, filter, isCleanup);
} catch (IOException ie) {
LOG.error("getTaskLogFileDetail threw an exception " + ie);
return null;
}
- return new File(getAttemptDir(l.location), filter.toString());
+ return new File(l.location, filter.toString());
}
private static class LogFileDetail {
final static String LOCATION = "LOG_DIR:";
@@ -93,16 +95,11 @@ public class TaskLog {
long length;
}
- private static LogFileDetail getTaskLogFileDetail(TaskAttemptID taskid,
- LogName filter) throws IOException {
- return getLogFileDetail(taskid, filter, false);
- }
-
private static LogFileDetail getLogFileDetail(TaskAttemptID taskid,
LogName filter,
boolean isCleanup)
throws IOException {
- File indexFile = getIndexFile(taskid.toString(), isCleanup);
+ File indexFile = getIndexFile(taskid, isCleanup);
BufferedReader fis = new BufferedReader(new java.io.FileReader(indexFile));
//the format of the index file is
//LOG_DIR: <the dir where the task logs are really stored>
@@ -120,7 +117,7 @@ public class TaskLog {
//to be associated with each task attempt since jvm reuse is disabled
//when profiling/debugging is enabled
if (filter.equals(LogName.DEBUGOUT) || filter.equals(LogName.PROFILE)) {
- l.length = new File(getAttemptDir(l.location), filter.toString()).length();
+ l.length = new File(l.location, filter.toString()).length();
l.start = 0;
fis.close();
return l;
@@ -141,39 +138,32 @@ public class TaskLog {
return l;
}
- private static File getTmpIndexFile(String taskid) {
- return new File(getAttemptDir(taskid), "log.tmp");
- }
- public static File getIndexFile(String taskid) {
- return getIndexFile(taskid, false);
+ private static File getTmpIndexFile(TaskAttemptID taskid, boolean isCleanup) {
+ return new File(getAttemptDir(taskid, isCleanup), "log.tmp");
}
-
- public static File getIndexFile(String taskid, boolean isCleanup) {
- if (isCleanup) {
- return new File(getAttemptDir(taskid), "log.index.cleanup");
- } else {
- return new File(getAttemptDir(taskid), "log.index");
- }
+
+ static File getIndexFile(TaskAttemptID taskid, boolean isCleanup) {
+ return new File(getAttemptDir(taskid, isCleanup), "log.index");
}
static String getBaseLogDir() {
return System.getProperty("hadoop.log.dir");
}
- static File getAttemptDir(String taskid) {
- return new File(getJobDir(TaskAttemptID.forName(taskid).getJobID()),
- taskid);
+ static File getAttemptDir(TaskAttemptID taskid, boolean isCleanup) {
+ String cleanupSuffix = isCleanup ? ".cleanup" : "";
+ return new File(getJobDir(taskid.getJobID()), taskid + cleanupSuffix);
}
private static long prevOutLength;
private static long prevErrLength;
private static long prevLogLength;
- private static void writeToIndexFile(TaskAttemptID firstTaskid,
+ private static void writeToIndexFile(String logLocation,
boolean isCleanup)
throws IOException {
// To ensure atomicity of updates to index file, write to temporary index
// file first and then rename.
- File tmpIndexFile = getTmpIndexFile(currentTaskid.toString());
+ File tmpIndexFile = getTmpIndexFile(currentTaskid, isCleanup);
BufferedOutputStream bos =
new BufferedOutputStream(new FileOutputStream(tmpIndexFile,false));
@@ -183,20 +173,23 @@ public class TaskLog {
//STDOUT: <start-offset in the stdout file> <length>
//STDERR: <start-offset in the stderr file> <length>
//SYSLOG: <start-offset in the syslog file> <length>
- dos.writeBytes(LogFileDetail.LOCATION + firstTaskid.toString()+"\n"+
- LogName.STDOUT.toString()+":");
- dos.writeBytes(Long.toString(prevOutLength)+" ");
- dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.STDOUT)
- .length() - prevOutLength)+"\n"+LogName.STDERR+":");
- dos.writeBytes(Long.toString(prevErrLength)+" ");
- dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.STDERR)
- .length() - prevErrLength)+"\n"+LogName.SYSLOG.toString()+":");
- dos.writeBytes(Long.toString(prevLogLength)+" ");
- dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.SYSLOG)
- .length() - prevLogLength)+"\n");
+ dos.writeBytes(LogFileDetail.LOCATION + logLocation + "\n"
+ + LogName.STDOUT.toString() + ":");
+ dos.writeBytes(Long.toString(prevOutLength) + " ");
+ dos.writeBytes(Long.toString(new File(logLocation, LogName.STDOUT
+ .toString()).length() - prevOutLength)
+ + "\n" + LogName.STDERR + ":");
+ dos.writeBytes(Long.toString(prevErrLength) + " ");
+ dos.writeBytes(Long.toString(new File(logLocation, LogName.STDERR
+ .toString()).length() - prevErrLength)
+ + "\n" + LogName.SYSLOG.toString() + ":");
+ dos.writeBytes(Long.toString(prevLogLength) + " ");
+ dos.writeBytes(Long.toString(new File(logLocation, LogName.SYSLOG
+ .toString()).length() - prevLogLength)
+ + "\n");
dos.close();
- File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
+ File indexFile = getIndexFile(currentTaskid, isCleanup);
Path indexFilePath = new Path(indexFile.getAbsolutePath());
Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath());
@@ -205,21 +198,15 @@ public class TaskLog {
}
localFS.rename (tmpIndexFilePath, indexFilePath);
}
- private static void resetPrevLengths(TaskAttemptID firstTaskid) {
- prevOutLength = getTaskLogFile(firstTaskid, LogName.STDOUT).length();
- prevErrLength = getTaskLogFile(firstTaskid, LogName.STDERR).length();
- prevLogLength = getTaskLogFile(firstTaskid, LogName.SYSLOG).length();
+ private static void resetPrevLengths(String logLocation) {
+ prevOutLength = new File(logLocation, LogName.STDOUT.toString()).length();
+ prevErrLength = new File(logLocation, LogName.STDERR.toString()).length();
+ prevLogLength = new File(logLocation, LogName.SYSLOG.toString()).length();
}
private volatile static TaskAttemptID currentTaskid = null;
- public synchronized static void syncLogs(TaskAttemptID firstTaskid,
- TaskAttemptID taskid)
- throws IOException {
- syncLogs(firstTaskid, taskid, false);
- }
-
@SuppressWarnings("unchecked")
- public synchronized static void syncLogs(TaskAttemptID firstTaskid,
+ public synchronized static void syncLogs(String logLocation,
TaskAttemptID taskid,
boolean isCleanup)
throws IOException {
@@ -238,9 +225,9 @@ public class TaskLog {
}
if (currentTaskid != taskid) {
currentTaskid = taskid;
- resetPrevLengths(firstTaskid);
+ resetPrevLengths(logLocation);
}
- writeToIndexFile(firstTaskid, isCleanup);
+ writeToIndexFile(logLocation, isCleanup);
}
/**
@@ -274,15 +261,10 @@ public class TaskLog {
}
}
- static class Reader extends InputStream {
+ public static class Reader extends InputStream {
private long bytesRemaining;
private FileInputStream file;
- public Reader(TaskAttemptID taskid, LogName kind,
- long start, long end) throws IOException {
- this(taskid, kind, start, end, false);
- }
-
/**
* Read a log file from start to end positions. The offsets may be negative,
* in which case they are relative to the end of the file. For example,
@@ -312,8 +294,7 @@ public class TaskLog {
start += fileDetail.start;
end += fileDetail.start;
bytesRemaining = end - start;
- file = new FileInputStream(new File(getAttemptDir(fileDetail.location),
- kind.toString()));
+ file = new FileInputStream(new File(fileDetail.location, kind.toString()));
// skip upto start
long pos = 0;
while (pos < start) {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java?rev=943039&r1=943038&r2=943039&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java Tue May 11 08:34:12 2010
@@ -34,6 +34,7 @@ public class TaskLogAppender extends Fil
//so that log4j can configure it from the configuration(log4j.properties).
private int maxEvents;
private Queue<LoggingEvent> tail = null;
+ private boolean isCleanup;
@Override
public void activateOptions() {
@@ -41,8 +42,8 @@ public class TaskLogAppender extends Fil
if (maxEvents > 0) {
tail = new LinkedList<LoggingEvent>();
}
- setFile(TaskLog.getTaskLogFile(TaskAttemptID.forName(taskId),
- TaskLog.LogName.SYSLOG).toString());
+ setFile(TaskLog.getTaskLogFile(TaskAttemptID.forName(taskId),
+ isCleanup, TaskLog.LogName.SYSLOG).toString());
setAppend(true);
super.activateOptions();
}
@@ -98,4 +99,22 @@ public class TaskLogAppender extends Fil
maxEvents = (int) logSize / EVENT_SIZE;
}
+ /**
+ * Set whether the task is a cleanup attempt or not.
+ *
+ * @param isCleanup
+ * true if the task is cleanup attempt, false otherwise.
+ */
+ public void setIsCleanup(boolean isCleanup) {
+ this.isCleanup = isCleanup;
+ }
+
+ /**
+ * Get whether task is cleanup attempt or not.
+ *
+ * @return true if the task is cleanup attempt, false otherwise.
+ */
+ public boolean getIsCleanup() {
+ return isCleanup;
+ }
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java?rev=943039&r1=943038&r2=943039&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java Tue May 11 08:34:12 2010
@@ -43,8 +43,9 @@ import org.apache.hadoop.util.StringUtil
public class TaskLogServlet extends HttpServlet {
private static final long serialVersionUID = -6615764817774487321L;
- private boolean haveTaskLog(TaskAttemptID taskId, TaskLog.LogName type) {
- File f = TaskLog.getTaskLogFile(taskId, type);
+ private boolean haveTaskLog(TaskAttemptID taskId, boolean isCleanup,
+ TaskLog.LogName type) {
+ File f = TaskLog.getTaskLogFile(taskId, isCleanup, type);
return f.canRead();
}
@@ -145,9 +146,10 @@ public class TaskLogServlet extends Http
* viewing task logs of old jobs(i.e. jobs finished on earlier unsecure
* cluster).
*/
- static Configuration getConfFromJobACLsFile(String attemptIdStr) {
+ static Configuration getConfFromJobACLsFile(TaskAttemptID attemptId,
+ boolean isCleanup) {
Path jobAclsFilePath = new Path(
- TaskLog.getAttemptDir(attemptIdStr).toString(), TaskRunner.jobACLsFile);
+ TaskLog.getAttemptDir(attemptId, isCleanup).toString(), TaskRunner.jobACLsFile);
Configuration conf = null;
if (new File(jobAclsFilePath.toUri().getPath()).exists()) {
conf = new Configuration(false);
@@ -176,38 +178,6 @@ public class TaskLogServlet extends Http
return;
}
- TaskAttemptID attemptId = TaskAttemptID.forName(attemptIdStr);
- if (!TaskLog.getAttemptDir(attemptIdStr).exists()) {
- response.sendError(HttpServletResponse.SC_GONE,
- "Task log directory for task " + attemptId +
- " does not exist. May be cleaned up by Task Tracker, if older logs.");
- return;
- }
-
- // get user name who is accessing
- String user = request.getRemoteUser();
- if (user != null) {
- ServletContext context = getServletContext();
- TaskTracker taskTracker = (TaskTracker) context.getAttribute(
- "task.tracker");
- // get jobACLConf from ACLs file
- Configuration jobACLConf = getConfFromJobACLsFile(attemptIdStr);
- // Ignore authorization if job-acls.xml is not found
- if (jobACLConf != null) {
- JobID jobId = attemptId.getJobID();
-
- try {
- checkAccessForTaskLogs(new JobConf(jobACLConf), user, jobId,
- taskTracker);
- } catch (AccessControlException e) {
- String errMsg = "User " + user + " failed to view tasklogs of job " +
- jobId + "!\n\n" + e.getMessage();
- response.sendError(HttpServletResponse.SC_UNAUTHORIZED, errMsg);
- return;
- }
- }
- }
-
String logFilter = request.getParameter("filter");
if (logFilter != null) {
try {
@@ -240,6 +210,38 @@ public class TaskLogServlet extends Http
isCleanup = Boolean.valueOf(sCleanup);
}
+ TaskAttemptID attemptId = TaskAttemptID.forName(attemptIdStr);
+ if (!TaskLog.getAttemptDir(attemptId, isCleanup).exists()) {
+ response.sendError(HttpServletResponse.SC_GONE,
+ "Task log directory for task " + attemptId +
+ " does not exist. May be cleaned up by Task Tracker, if older logs.");
+ return;
+ }
+
+ // get user name who is accessing
+ String user = request.getRemoteUser();
+ if (user != null) {
+ ServletContext context = getServletContext();
+ TaskTracker taskTracker = (TaskTracker) context.getAttribute(
+ "task.tracker");
+ // get jobACLConf from ACLs file
+ Configuration jobACLConf = getConfFromJobACLsFile(attemptId, isCleanup);
+ // Ignore authorization if job-acls.xml is not found
+ if (jobACLConf != null) {
+ JobID jobId = attemptId.getJobID();
+
+ try {
+ checkAccessForTaskLogs(new JobConf(jobACLConf), user, jobId,
+ taskTracker);
+ } catch (AccessControlException e) {
+ String errMsg = "User " + user + " failed to view tasklogs of job " +
+ jobId + "!\n\n" + e.getMessage();
+ response.sendError(HttpServletResponse.SC_UNAUTHORIZED, errMsg);
+ return;
+ }
+ }
+ }
+
OutputStream out = response.getOutputStream();
if( !plainText ) {
out.write(("<html>\n" +
@@ -254,11 +256,11 @@ public class TaskLogServlet extends Http
TaskLog.LogName.STDERR, isCleanup);
printTaskLog(response, out, attemptId, start, end, plainText,
TaskLog.LogName.SYSLOG, isCleanup);
- if (haveTaskLog(attemptId, TaskLog.LogName.DEBUGOUT)) {
+ if (haveTaskLog(attemptId, isCleanup, TaskLog.LogName.DEBUGOUT)) {
printTaskLog(response, out, attemptId, start, end, plainText,
TaskLog.LogName.DEBUGOUT, isCleanup);
}
- if (haveTaskLog(attemptId, TaskLog.LogName.PROFILE)) {
+ if (haveTaskLog(attemptId, isCleanup, TaskLog.LogName.PROFILE)) {
printTaskLog(response, out, attemptId, start, end, plainText,
TaskLog.LogName.PROFILE, isCleanup);
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=943039&r1=943038&r2=943039&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue May 11 08:34:12 2010
@@ -214,7 +214,7 @@ abstract class TaskRunner extends Thread
List<String> setup = getVMSetupCmd();
// Set up the redirection of the task's stdout and stderr streams
- File[] logFiles = prepareLogFiles(taskid);
+ File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());
File stdout = logFiles[0];
File stderr = logFiles[1];
tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,
@@ -285,13 +285,17 @@ abstract class TaskRunner extends Thread
* Prepare the log files for the task
*
* @param taskid
+ * @param isCleanup
* @return an array of files. The first file is stdout, the second is stderr.
* @throws IOException
*/
- File[] prepareLogFiles(TaskAttemptID taskid) throws IOException {
+ File[] prepareLogFiles(TaskAttemptID taskid, boolean isCleanup)
+ throws IOException {
File[] logFiles = new File[2];
- logFiles[0] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
- logFiles[1] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
+ logFiles[0] = TaskLog.getTaskLogFile(taskid, isCleanup,
+ TaskLog.LogName.STDOUT);
+ logFiles[1] = TaskLog.getTaskLogFile(taskid, isCleanup,
+ TaskLog.LogName.STDERR);
File logDir = logFiles[0].getParentFile();
boolean b = logDir.mkdirs();
if (!b) {
@@ -455,17 +459,13 @@ abstract class TaskRunner extends Thread
vargs.add(classPath);
// Setup the log4j prop
- vargs.add("-Dhadoop.log.dir=" +
- new File(System.getProperty("hadoop.log.dir")
- ).getAbsolutePath());
- vargs.add("-Dhadoop.root.logger=" + getLogLevel(conf).toString() + ",TLA");
- vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
- vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
+ setupLog4jProperties(vargs, taskid, logSize);
if (conf.getProfileEnabled()) {
if (conf.getProfileTaskRange(t.isMapTask()
).isIncluded(t.getPartition())) {
- File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE);
+ File prof = TaskLog.getTaskLogFile(taskid, t.isTaskCleanupTask(),
+ TaskLog.LogName.PROFILE);
vargs.add(String.format(conf.getProfileParams(), prof.toString()));
}
}
@@ -477,9 +477,21 @@ abstract class TaskRunner extends Thread
vargs.add(address.getAddress().getHostAddress());
vargs.add(Integer.toString(address.getPort()));
vargs.add(taskid.toString()); // pass task identifier
+ // pass task log location
+ vargs.add(TaskLog.getAttemptDir(taskid, t.isTaskCleanupTask()).toString());
return vargs;
}
+ private void setupLog4jProperties(Vector<String> vargs, TaskAttemptID taskid,
+ long logSize) {
+ vargs.add("-Dhadoop.log.dir=" +
+ new File(System.getProperty("hadoop.log.dir")).getAbsolutePath());
+ vargs.add("-Dhadoop.root.logger=" + getLogLevel(conf).toString() + ",TLA");
+ vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
+ vargs.add("-Dhadoop.tasklog.iscleanup=" + t.isTaskCleanupTask());
+ vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
+ }
+
/**
* @param taskid
* @param workDir
@@ -562,6 +574,7 @@ abstract class TaskRunner extends Thread
hadoopClientOpts = hadoopClientOpts + " ";
}
hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + taskid
+ + " -Dhadoop.tasklog.iscleanup=" + t.isTaskCleanupTask()
+ " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
env.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=943039&r1=943038&r2=943039&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue May 11 08:34:12 2010
@@ -2759,17 +2759,17 @@ public class TaskTracker
String jobConf = task.getJobFile();
try {
// get task's stdout file
- taskStdout = FileUtil.makeShellPath(
- TaskLog.getRealTaskLogFileLocation
- (task.getTaskID(), TaskLog.LogName.STDOUT));
- // get task's stderr file
- taskStderr = FileUtil.makeShellPath(
- TaskLog.getRealTaskLogFileLocation
- (task.getTaskID(), TaskLog.LogName.STDERR));
- // get task's syslog file
- taskSyslog = FileUtil.makeShellPath(
- TaskLog.getRealTaskLogFileLocation
- (task.getTaskID(), TaskLog.LogName.SYSLOG));
+ taskStdout = FileUtil
+ .makeShellPath(TaskLog.getRealTaskLogFileLocation(task.getTaskID(),
+ task.isTaskCleanupTask(), TaskLog.LogName.STDOUT));
+ // get task's stderr file
+ taskStderr = FileUtil
+ .makeShellPath(TaskLog.getRealTaskLogFileLocation(task.getTaskID(),
+ task.isTaskCleanupTask(), TaskLog.LogName.STDERR));
+ // get task's syslog file
+ taskSyslog = FileUtil
+ .makeShellPath(TaskLog.getRealTaskLogFileLocation(task.getTaskID(),
+ task.isTaskCleanupTask(), TaskLog.LogName.SYSLOG));
} catch(IOException e){
LOG.warn("Exception finding task's stdout/err/syslog files");
}
@@ -2788,8 +2788,8 @@ public class TaskTracker
StringUtils.stringifyException(e));
}
// Build the command
- File stdout = TaskLog.getRealTaskLogFileLocation(
- task.getTaskID(), TaskLog.LogName.DEBUGOUT);
+ File stdout = TaskLog.getTaskLogFile(task.getTaskID(), task
+ .isTaskCleanupTask(), TaskLog.LogName.DEBUGOUT);
// add pipes program as argument if it exists.
String program ="";
String executable = Submitter.getExecutable(localJobConf);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java?rev=943039&r1=943038&r2=943039&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java Tue May 11 08:34:12 2010
@@ -97,10 +97,12 @@ class Application<K1 extends WritableCom
}
cmd.add(executable);
// wrap the command in a stdout/stderr capture
+ // we are starting map/reduce task of the pipes job. this is not a cleanup
+ // attempt.
TaskAttemptID taskid =
TaskAttemptID.forName(conf.get(MRJobConfig.TASK_ATTEMPT_ID));
- File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
- File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
+ File stdout = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDOUT);
+ File stderr = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDERR);
long logLength = TaskLog.getTaskLogLength(conf);
cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength,
false);
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java?rev=943039&r1=943038&r2=943039&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java Tue May 11 08:34:12 2010
@@ -146,7 +146,7 @@ public class TestDebugScript {
*/
static void verifyDebugScriptOutput(TaskAttemptID taskId, String expectedUser,
String expectedGroup, String expectedPerms) throws Exception {
- File output = TaskLog.getRealTaskLogFileLocation(taskId,
+ File output = TaskLog.getRealTaskLogFileLocation(taskId, false,
TaskLog.LogName.DEBUGOUT);
// Check the presence of the output file if the script is to be run.
assertTrue("Output file does not exists. DebugScript has not been run",
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJvmReuse.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJvmReuse.java?rev=943039&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJvmReuse.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJvmReuse.java Tue May 11 08:34:12 2010
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.TaskType;
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+public class TestJvmReuse {
+ private static Path rootDir = new Path(System.getProperty("test.build.data",
+ "/tmp"), TestJvmReuse.class.getName());
+ private int numMappers = 5;
+ private static int taskWithCleanup = 2; // third task
+
+ /**
+ * A mapper class in which all attempts log taskid. Zeroth attempt of task
+ * with id=taskWithCleanup, fails with System.exit to force a cleanup attempt
+ * for the task in a new jvm.
+ */
+ public static class MapperClass extends MapReduceBase implements
+ Mapper<LongWritable, Text, Text, IntWritable> {
+ String taskid;
+ static int instances = 0;
+ Reporter reporter = null;
+
+ public void configure(JobConf job) {
+ taskid = job.get("mapred.task.id");
+ }
+
+ public void map(LongWritable key, Text value,
+ OutputCollector<Text, IntWritable> output, Reporter reporter)
+ throws IOException {
+ System.err.println(taskid);
+ this.reporter = reporter;
+
+ if (TaskAttemptID.forName(taskid).getTaskID().getId() == taskWithCleanup) {
+ if (taskid.endsWith("_0")) {
+ System.exit(-1);
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ reporter.incrCounter("jvm", "use", ++instances);
+ }
+ }
+
+ public RunningJob launchJob(JobConf conf, Path inDir, Path outDir)
+ throws IOException {
+ // set up the input file system and write input text.
+ FileSystem inFs = inDir.getFileSystem(conf);
+ FileSystem outFs = outDir.getFileSystem(conf);
+ outFs.delete(outDir, true);
+ if (!inFs.mkdirs(inDir)) {
+ throw new IOException("Mkdirs failed to create " + inDir.toString());
+ }
+ for (int i = 0; i < numMappers; i++) {
+ // write input into input file
+ DataOutputStream file = inFs.create(new Path(inDir, "part-" + i));
+ file.writeBytes("input");
+ file.close();
+ }
+
+ // configure the mapred Job
+ conf.setMapperClass(MapperClass.class);
+ conf.setNumReduceTasks(0);
+ FileInputFormat.setInputPaths(conf, inDir);
+ FileOutputFormat.setOutputPath(conf, outDir);
+ // enable jvm reuse
+ conf.setNumTasksToExecutePerJvm(-1);
+ // return the RunningJob handle.
+ return new JobClient(conf).submitJob(conf);
+ }
+
+ private void validateAttempt(TaskInProgress tip, TaskAttemptID attemptId,
+ TaskStatus ts, boolean isCleanup) throws IOException {
+ assertEquals(isCleanup, tip.isCleanupAttempt(attemptId));
+ // validate tasklogs for task attempt
+ String log = MapReduceTestUtil.readTaskLog(TaskLog.LogName.STDERR,
+ attemptId, false);
+ assertTrue(log.equals(attemptId.toString()));
+ assertTrue(ts != null);
+ if (!isCleanup) {
+ assertEquals(TaskStatus.State.SUCCEEDED, ts.getRunState());
+ } else {
+ assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+ // validate tasklogs for cleanup attempt
+ log = MapReduceTestUtil.readTaskLog(TaskLog.LogName.STDERR, attemptId,
+ true);
+ assertTrue(log.equals(TestTaskFail.cleanupLog));
+ }
+ }
+
+ // validates logs of all attempts of the job.
+ private void validateJob(RunningJob job, MiniMRCluster mr) throws IOException {
+ assertEquals(JobStatus.SUCCEEDED, job.getJobState());
+ long uses = job.getCounters().findCounter("jvm", "use").getValue();
+ assertTrue("maps = " + numMappers + ", jvms = " + uses, numMappers < uses);
+
+ JobID jobId = job.getID();
+
+ for (int i = 0; i < numMappers; i++) {
+ TaskAttemptID attemptId = new TaskAttemptID(new TaskID(jobId,
+ TaskType.MAP, i), 0);
+ TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().getTip(
+ attemptId.getTaskID());
+ TaskStatus ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(
+ attemptId);
+ validateAttempt(tip, attemptId, ts, i == taskWithCleanup);
+ if (i == taskWithCleanup) {
+ // validate second attempt of the task
+ attemptId = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, i), 1);
+ ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
+ validateAttempt(tip, attemptId, ts, false);
+ }
+ }
+ }
+
+ /**
+ * Runs job with jvm reuse and verifies that the logs for all attempts can be
+ * read properly.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testTaskLogs() throws IOException {
+ MiniMRCluster mr = null;
+ try {
+ Configuration conf = new Configuration();
+ final int taskTrackers = 1; // taskTrackers should be 1 to test jvm reuse.
+ conf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
+ mr = new MiniMRCluster(taskTrackers, "file:///", 1);
+
+ final Path inDir = new Path(rootDir, "input");
+ final Path outDir = new Path(rootDir, "output");
+ JobConf jobConf = mr.createJobConf();
+ jobConf.setOutputCommitter(TestTaskFail.CommitterWithLogs.class);
+ RunningJob rJob = launchJob(jobConf, inDir, outDir);
+ rJob.waitForCompletion();
+ validateJob(rJob, mr);
+ } finally {
+ if (mr != null) {
+ mr.shutdown();
+ }
+ }
+ }
+}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java?rev=943039&r1=943038&r2=943039&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java Tue May 11 08:34:12 2010
@@ -19,7 +19,7 @@ package org.apache.hadoop.mapred;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.InputStream;
+import java.net.HttpURLConnection;
import junit.framework.TestCase;
@@ -32,11 +32,12 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.TaskType;
public class TestTaskFail extends TestCase {
private static String taskLog = "Task attempt log";
- private static String cleanupLog = "cleanup attempt log";
+ static String cleanupLog = "cleanup attempt log";
public static class MapperClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
@@ -48,6 +49,8 @@ public class TestTaskFail extends TestCa
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
System.err.println(taskLog);
+ assertFalse(Boolean.getBoolean(System
+ .getProperty("hadoop.tasklog.iscleanup")));
if (taskid.endsWith("_0")) {
throw new IOException();
} else if (taskid.endsWith("_1")) {
@@ -61,6 +64,15 @@ public class TestTaskFail extends TestCa
static class CommitterWithLogs extends FileOutputCommitter {
public void abortTask(TaskAttemptContext context) throws IOException {
System.err.println(cleanupLog);
+ String attemptId = System.getProperty("hadoop.tasklog.taskid");
+ assertNotNull(attemptId);
+ if (attemptId.endsWith("_0")) {
+ assertFalse(Boolean.getBoolean(System
+ .getProperty("hadoop.tasklog.iscleanup")));
+ } else {
+ assertTrue(Boolean.getBoolean(System
+ .getProperty("hadoop.tasklog.iscleanup")));
+ }
super.abortTask(context);
}
}
@@ -113,65 +125,44 @@ public class TestTaskFail extends TestCa
}
private void validateAttempt(TaskInProgress tip, TaskAttemptID attemptId,
- TaskStatus ts, boolean isCleanup)
+ TaskStatus ts, boolean isCleanup, JobTracker jt)
throws IOException {
assertEquals(isCleanup, tip.isCleanupAttempt(attemptId));
assertTrue(ts != null);
assertEquals(TaskStatus.State.FAILED, ts.getRunState());
// validate tasklogs for task attempt
- String log = readTaskLog(
+ String log = MapReduceTestUtil.readTaskLog(
TaskLog.LogName.STDERR, attemptId, false);
assertTrue(log.contains(taskLog));
+ // access the logs from web url
+ TaskTrackerStatus ttStatus = jt.getTaskTracker(
+ tip.machineWhereTaskRan(attemptId)).getStatus();
+ String tasklogUrl = TaskLogServlet.getTaskLogUrl("localhost",
+ String.valueOf(ttStatus.getHttpPort()), attemptId.toString()) +
+ "&filter=STDERR";
+ assertEquals(HttpURLConnection.HTTP_OK, TestWebUIAuthorization
+ .getHttpStatusCode(tasklogUrl, tip.getUser(), "GET"));
if (!isCleanup) {
// validate task logs: tasklog should contain both task logs
// and cleanup logs
assertTrue(log.contains(cleanupLog));
} else {
// validate tasklogs for cleanup attempt
- log = readTaskLog(
+ log = MapReduceTestUtil.readTaskLog(
TaskLog.LogName.STDERR, attemptId, true);
assertTrue(log.contains(cleanupLog));
+ // access the cleanup attempt's logs from web url
+ ttStatus = jt.getTaskTracker(tip.machineWhereCleanupRan(attemptId))
+ .getStatus();
+ String cleanupTasklogUrl = TaskLogServlet.getTaskLogUrl("localhost",
+ String.valueOf(ttStatus.getHttpPort()), attemptId.toString())
+ + "&filter=STDERR&cleanup=true";
+ assertEquals(HttpURLConnection.HTTP_OK, TestWebUIAuthorization
+ .getHttpStatusCode(cleanupTasklogUrl, tip.getUser(), "GET"));
}
}
- /**
- * Reads tasklog and returns it as string after trimming it.
- * @param filter Task log filter; can be STDOUT, STDERR,
- * SYSLOG, DEBUGOUT, DEBUGERR
- * @param taskId The task id for which the log has to collected
- * @param isCleanup whether the task is a cleanup attempt or not.
- * @return task log as string
- * @throws IOException
- */
- private String readTaskLog(TaskLog.LogName filter,
- TaskAttemptID taskId,
- boolean isCleanup)
- throws IOException {
- // string buffer to store task log
- StringBuffer result = new StringBuffer();
- int res;
-
- // reads the whole tasklog into inputstream
- InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1, isCleanup);
- // construct string log from inputstream.
- byte[] b = new byte[65536];
- while (true) {
- res = taskLogReader.read(b);
- if (res > 0) {
- result.append(new String(b));
- } else {
- break;
- }
- }
- taskLogReader.close();
-
- // trim the string and return it
- String str = result.toString();
- str = str.trim();
- return str;
- }
-
- private void validateJob(RunningJob job, MiniMRCluster mr)
+ private void validateJob(RunningJob job, JobTracker jt)
throws IOException {
assertEquals(JobStatus.SUCCEEDED, job.getJobState());
@@ -181,23 +172,21 @@ public class TestTaskFail extends TestCa
// fails with an exception
TaskAttemptID attemptId =
new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 0);
- TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
- getTip(attemptId.getTaskID());
- TaskStatus ts =
- mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
- validateAttempt(tip, attemptId, ts, false);
+ TaskInProgress tip = jt.getTip(attemptId.getTaskID());
+ TaskStatus ts = jt.getTaskStatus(attemptId);
+ validateAttempt(tip, attemptId, ts, false, jt);
attemptId = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 1);
// this should be cleanup attempt since the second attempt fails
// with System.exit
- ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
- validateAttempt(tip, attemptId, ts, true);
+ ts = jt.getTaskStatus(attemptId);
+ validateAttempt(tip, attemptId, ts, true, jt);
attemptId = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 2);
// this should be cleanup attempt since the third attempt fails
// with Error
- ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
- validateAttempt(tip, attemptId, ts, true);
+ ts = jt.getTaskStatus(attemptId);
+ validateAttempt(tip, attemptId, ts, true, jt);
}
public void testWithDFS() throws IOException {
@@ -211,6 +200,7 @@ public class TestTaskFail extends TestCa
dfs = new MiniDFSCluster(conf, 4, true, null);
fileSys = dfs.getFileSystem();
mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
+ JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
final Path inDir = new Path("./input");
final Path outDir = new Path("./output");
String input = "The quick brown fox\nhas many silly\nred fox sox\n";
@@ -222,18 +212,18 @@ public class TestTaskFail extends TestCa
jobConf.setOutputCommitter(CommitterWithLogs.class);
RunningJob rJob = launchJob(jobConf, inDir, outDir, input);
rJob.waitForCompletion();
- validateJob(rJob, mr);
+ validateJob(rJob, jt);
// launch job with fail tasks and fail-cleanups
fileSys.delete(outDir, true);
jobConf.setOutputCommitter(CommitterWithFailTaskCleanup.class);
rJob = launchJob(jobConf, inDir, outDir, input);
rJob.waitForCompletion();
- validateJob(rJob, mr);
+ validateJob(rJob, jt);
fileSys.delete(outDir, true);
jobConf.setOutputCommitter(CommitterWithFailTaskCleanup2.class);
rJob = launchJob(jobConf, inDir, outDir, input);
rJob.waitForCompletion();
- validateJob(rJob, mr);
+ validateJob(rJob, jt);
} finally {
if (dfs != null) { dfs.shutdown(); }
if (mr != null) { mr.shutdown(); }
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=943039&r1=943038&r2=943039&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Tue May 11 08:34:12 2010
@@ -576,7 +576,8 @@ public class TestTaskTrackerLocalization
runner.setupChildTaskConfiguration(lDirAlloc);
TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
localizedJobConf);
- attemptLogFiles = runner.prepareLogFiles(task.getTaskID());
+ attemptLogFiles = runner.prepareLogFiles(task.getTaskID(),
+ task.isTaskCleanupTask());
// Make sure the task-conf file is created
Path localTaskFile =
@@ -626,7 +627,7 @@ public class TestTaskTrackerLocalization
.getPath(), "tmp").exists());
// Make sure that the logs are setup properly
- File logDir = TaskLog.getAttemptDir(taskId.toString());
+ File logDir = TaskLog.getAttemptDir(taskId, task.isTaskCleanupTask());
assertTrue("task's log dir " + logDir.toString() + " doesn't exist!",
logDir.exists());
checkFilePermissions(logDir.getAbsolutePath(), "drwx------", task
@@ -655,8 +656,8 @@ public class TestTaskTrackerLocalization
taskTrackerUGI.getShortUserName(), taskTrackerUGI.getGroupNames()[0]);
// Validate the contents of jobACLsFile(both user name and job-view-acls)
- Configuration jobACLsConf =
- TaskLogServlet.getConfFromJobACLsFile(task.getTaskID().toString());
+ Configuration jobACLsConf = TaskLogServlet.getConfFromJobACLsFile(task
+ .getTaskID(), task.isTaskCleanupTask());
assertTrue(jobACLsConf.get(MRJobConfig.USER_NAME).equals(
localizedJobConf.getUser()));
assertTrue(jobACLsConf.get(MRJobConfig.JOB_ACL_VIEW_JOB).
@@ -695,7 +696,7 @@ public class TestTaskTrackerLocalization
* $taskid/work
* Also see createFileAndSetPermissions for details
*/
- void validateRemoveFiles(boolean needCleanup, boolean jvmReuse,
+ void validateRemoveTaskFiles(boolean needCleanup, boolean jvmReuse,
TaskInProgress tip) throws IOException {
// create files and set permissions 555. Verify if task controller sets
// the permissions for TT to delete the taskDir or workDir
@@ -733,7 +734,6 @@ public class TestTaskTrackerLocalization
// now try to delete the work dir and verify that there are no stale paths
JvmManager.deleteWorkDir(tracker, task);
}
- tracker.removeJobFiles(task.getUser(), jobId);
assertTrue("Some task files are not deleted!! Number of stale paths is "
+ cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0);
@@ -743,42 +743,51 @@ public class TestTaskTrackerLocalization
* Validates if task cleanup is done properly for a succeeded task
* @throws IOException
*/
- public void testTaskCleanup()
+ public void testTaskFilesRemoval()
throws Exception {
if (!canRun()) {
return;
}
- testTaskCleanup(false, false);// no needCleanup; no jvmReuse
+ testTaskFilesRemoval(false, false);// no needCleanup; no jvmReuse
}
/**
* Validates if task cleanup is done properly for a task that is not succeeded
* @throws IOException
*/
- public void testFailedTaskCleanup()
+ public void testFailedTaskFilesRemoval()
throws Exception {
if (!canRun()) {
return;
}
- testTaskCleanup(true, false);// needCleanup; no jvmReuse
+ testTaskFilesRemoval(true, false);// needCleanup; no jvmReuse
+
+ // initialize a cleanupAttempt for the task.
+ task.setTaskCleanupTask();
+ // localize task cleanup attempt
+ initializeTask();
+ checkTaskLocalization();
+
+ // verify the cleanup of cleanup attempt.
+ testTaskFilesRemoval(true, false);// needCleanup; no jvmReuse
}
/**
* Validates if task cleanup is done properly for a succeeded task
* @throws IOException
*/
- public void testTaskCleanupWithJvmUse()
+ public void testTaskFilesRemovalWithJvmUse()
throws Exception {
if (!canRun()) {
return;
}
- testTaskCleanup(false, true);// no needCleanup; jvmReuse
+ testTaskFilesRemoval(false, true);// no needCleanup; jvmReuse
}
/**
* Validates if task cleanup is done properly
*/
- private void testTaskCleanup(boolean needCleanup, boolean jvmReuse)
+ private void testTaskFilesRemoval(boolean needCleanup, boolean jvmReuse)
throws Exception {
// Localize job and localize task.
TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
@@ -792,19 +801,7 @@ public class TestTaskTrackerLocalization
// create files and set permissions 555. Verify if task controller sets
// the permissions for TT to delete the task dir or work dir properly
- validateRemoveFiles(needCleanup, jvmReuse, tip);
-
- // Check that the empty $mapreduce.cluster.local.dir/taskTracker/$user dirs are still
- // there.
- for (String localDir : localDirs) {
- Path userDir =
- new Path(localDir, TaskTracker.getUserDir(task.getUser()));
- assertTrue("User directory " + userDir + " is not present!!",
- tracker.getLocalFileSystem().exists(userDir));
- }
-
- // Test userlogs cleanup.
- verifyUserLogsCleanup();
+ validateRemoveTaskFiles(needCleanup, jvmReuse, tip);
}
/**
@@ -812,7 +809,7 @@ public class TestTaskTrackerLocalization
*
* @throws IOException
*/
- private void verifyUserLogsCleanup()
+ private void verifyUserLogsRemoval()
throws IOException {
// verify user logs cleanup
File jobUserLogDir = TaskLog.getJobDir(jobId);
@@ -832,7 +829,7 @@ public class TestTaskTrackerLocalization
* - create files with no write permissions to TT under job-work-dir
* - create files with no write permissions to TT under task-work-dir
*/
- public void testJobCleanup() throws IOException, InterruptedException {
+ public void testJobFilesRemoval() throws IOException, InterruptedException {
if (!canRun()) {
return;
}
@@ -899,6 +896,17 @@ public class TestTaskTrackerLocalization
}
assertFalse("Job " + task.getJobID() + " work dir exists after cleanup",
jWorkDirExists);
+ // Test userlogs cleanup.
+ verifyUserLogsRemoval();
+
+ // Check that the empty $mapred.local.dir/taskTracker/$user dirs are still
+ // there.
+ for (String localDir : localDirs) {
+ Path userDir =
+ new Path(localDir, TaskTracker.getUserDir(task.getUser()));
+ assertTrue("User directory " + userDir + " is not present!!",
+ tracker.getLocalFileSystem().exists(userDir));
+ }
}
/**
@@ -983,4 +991,30 @@ public class TestTaskTrackerLocalization
checkTaskLocalization();
}
+ /**
+ * Localizes a cleanup task and validates permissions.
+ *
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public void testCleanupTaskLocalization() throws IOException,
+ InterruptedException {
+ if (!canRun()) {
+ return;
+ }
+
+ task.setTaskCleanupTask();
+ // register task
+ tip = tracker.new TaskInProgress(task, trackerFConf);
+
+ // localize the job.
+ RunningJob rjob = tracker.localizeJob(tip);
+ localizedJobConf = rjob.getJobConf();
+ checkJobLocalization();
+
+ // localize task cleanup attempt
+ initializeTask();
+ checkTaskLocalization();
+
+ }
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java?rev=943039&r1=943038&r2=943039&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java Tue May 11 08:34:12 2010
@@ -341,8 +341,9 @@ public class TestWebUIAuthorization exte
// delete job-acls.xml file from the task log dir of attempt and verify
// if unauthorized users can view task logs of attempt.
- Path jobACLsFilePath = new Path(
- TaskLog.getAttemptDir(attempt.toString()).toString(),
+ File attemptLogDir = TaskLog.getAttemptDir(
+ org.apache.hadoop.mapred.TaskAttemptID.downgrade(attempt), false);
+ Path jobACLsFilePath = new Path(attemptLogDir.toString(),
TaskRunner.jobACLsFile);
new File(jobACLsFilePath.toUri().getPath()).delete();
assertEquals("Incorrect return code for " + unauthorizedUser,
@@ -354,7 +355,7 @@ public class TestWebUIAuthorization exte
// delete the whole task log dir of attempt and verify that we get
// correct response code (i.e. HTTP_GONE) when task logs are accessed.
- FileUtil.fullyDelete(TaskLog.getAttemptDir(attempt.toString()));
+ FileUtil.fullyDelete(attemptLogDir);
assertEquals("Incorrect return code for " + jobSubmitter,
HttpURLConnection.HTTP_GONE, getHttpStatusCode(stdoutURL,
jobSubmitter, "GET"));
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=943039&r1=943038&r2=943039&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Tue May 11 08:34:12 2010
@@ -23,6 +23,7 @@ import java.io.DataOutput;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.text.NumberFormat;
import java.util.ArrayList;
@@ -43,7 +44,10 @@ import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.Utils;
+import org.apache.hadoop.mapred.TaskLog.LogName;
+import org.apache.hadoop.mapred.TaskLog.Reader;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
@@ -415,4 +419,44 @@ public class MapReduceTestUtil {
return result.toString();
}
+ /**
+ * Reads tasklog and returns it as string after trimming it.
+ *
+ * @param filter
+ * Task log filter; can be STDOUT, STDERR, SYSLOG, DEBUGOUT, PROFILE
+ * @param taskId
+ * The task id for which the log has to collected
+ * @param isCleanup
+ * whether the task is a cleanup attempt or not.
+ * @return task log as string
+ * @throws IOException
+ */
+ public static String readTaskLog(TaskLog.LogName filter,
+ org.apache.hadoop.mapred.TaskAttemptID taskId, boolean isCleanup)
+ throws IOException {
+ // string buffer to store task log
+ StringBuffer result = new StringBuffer();
+ int res;
+
+ // reads the whole tasklog into inputstream
+ InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1,
+ isCleanup);
+ // construct string log from inputstream.
+ byte[] b = new byte[65536];
+ while (true) {
+ res = taskLogReader.read(b);
+ if (res > 0) {
+ result.append(new String(b));
+ } else {
+ break;
+ }
+ }
+ taskLogReader.close();
+
+ // trim the string and return it
+ String str = result.toString();
+ str = str.trim();
+ return str;
+ }
+
}