You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:38:37 UTC
svn commit: r1077082 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapred/
Author: omalley
Date: Fri Mar 4 03:38:37 2011
New Revision: 1077082
URL: http://svn.apache.org/viewvc?rev=1077082&view=rev
Log:
commit adfa7a6d969469517d5f3a91127e6ee9d65c921d
Author: Arun C Murthy <ac...@apache.org>
Date: Wed Dec 16 23:46:27 2009 +0530
MAPREDUCE-1100. Truncate user logs to prevent TaskTrackers' disks from filling up. Contributed by Vinod Kumar Vavilapalli.
From: https://issues.apache.org/jira/secure/attachment/12428200/MAPREDUCE-1100-20091216.2.txt
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1100. Truncate user logs to prevent TaskTrackers' disks from
+ filling up. (Vinod Kumar Vavilapalli via acmurthy)
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=1077082&r1=1077081&r2=1077082&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java Fri Mar 4 03:38:37 2011
@@ -195,6 +195,8 @@ class JvmManager {
equals(task.getTaskID().toString())) {
tracker.getTaskController().initializeTask(context);
}
+
+ jvmRunner.taskGiven(task);
return taskRunner.getTaskInProgress();
}
@@ -376,6 +378,13 @@ class JvmManager {
private ShellCommandExecutor shexec; // shell terminal for running the task
//context used for starting JVM
private TaskControllerContext initalContext;
+
+ private List<Task> tasksGiven = new ArrayList<Task>();
+
+ void taskGiven(Task task) {
+ tasksGiven.add(task);
+ }
+
public JvmRunner(JvmEnv env, JobID jobId) {
this.env = env;
this.jvmId = new JVMId(jobId, isMap, rand.nextInt());
@@ -384,6 +393,9 @@ class JvmManager {
}
public void run() {
runChild(env);
+
+ // Post-JVM-exit logs processing. Truncate the logs.
+ truncateJVMLogs();
}
public void runChild(JvmEnv env) {
@@ -446,7 +458,14 @@ class JvmManager {
removeJvm(jvmId);
}
}
-
+
+ // Post-JVM-exit logs processing. Truncate the logs.
+ private void truncateJVMLogs() {
+ Task firstTask = initalContext.task;
+ tracker.getTaskLogsMonitor().addProcessForLogTruncation(
+ firstTask.getTaskID(), tasksGiven);
+ }
+
public void taskRan() {
busy = false;
numTasksRan++;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1077082&r1=1077081&r2=1077082&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Mar 4 03:38:37 2011
@@ -28,8 +28,11 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Enumeration;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -69,33 +72,58 @@ public class TaskLog {
public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
return new File(getBaseDir(taskid.toString()), filter.toString());
}
+
+ /**
+ * @deprecated Instead use
+ * {@link #getAllLogsFileDetails(TaskAttemptID, boolean)} to get
+ * the details of all log-files and then use the particular
+ * log-type's detail to call getRealTaskLogFileLocation(String,
+ * LogName) real log-location
+ */
+ @Deprecated
public static File getRealTaskLogFileLocation(TaskAttemptID taskid,
LogName filter) {
LogFileDetail l;
try {
- l = getTaskLogFileDetail(taskid, filter);
+ Map<LogName, LogFileDetail> allFilesDetails =
+ getAllLogsFileDetails(taskid, false);
+ l = allFilesDetails.get(filter);
} catch (IOException ie) {
- LOG.error("getTaskLogFileDetail threw an exception " + ie);
+ LOG.error("getTaskLogFileDetailgetAllLogsFileDetails threw an exception "
+ + ie);
return null;
}
return new File(getBaseDir(l.location), filter.toString());
}
- private static class LogFileDetail {
+
+ /**
+ * Get the real task-log file-path
+ *
+ * @param location Location of the log-file. This should point to an
+ * attempt-directory.
+ * @param filter
+ * @return
+ * @throws IOException
+ */
+ static String getRealTaskLogFilePath(String location, LogName filter)
+ throws IOException {
+ return FileUtil.makeShellPath(new File(getBaseDir(location),
+ filter.toString()));
+ }
+
+ static class LogFileDetail {
final static String LOCATION = "LOG_DIR:";
String location;
long start;
long length;
}
-
- private static LogFileDetail getTaskLogFileDetail(TaskAttemptID taskid,
- LogName filter) throws IOException {
- return getLogFileDetail(taskid, filter, false);
- }
-
- private static LogFileDetail getLogFileDetail(TaskAttemptID taskid,
- LogName filter,
- boolean isCleanup)
- throws IOException {
+
+ static Map<LogName, LogFileDetail> getAllLogsFileDetails(
+ TaskAttemptID taskid, boolean isCleanup) throws IOException {
+
+ Map<LogName, LogFileDetail> allLogsFileDetails =
+ new HashMap<LogName, LogFileDetail>();
+
File indexFile = getIndexFile(taskid.toString(), isCleanup);
BufferedReader fis = new BufferedReader(new java.io.FileReader(indexFile));
//the format of the index file is
@@ -103,36 +131,37 @@ public class TaskLog {
//stdout:<start-offset in the stdout file> <length>
//stderr:<start-offset in the stderr file> <length>
//syslog:<start-offset in the syslog file> <length>
- LogFileDetail l = new LogFileDetail();
String str = fis.readLine();
if (str == null) { //the file doesn't have anything
throw new IOException ("Index file for the log of " + taskid+" doesn't exist.");
}
- l.location = str.substring(str.indexOf(LogFileDetail.LOCATION)+
+ String loc = str.substring(str.indexOf(LogFileDetail.LOCATION)+
LogFileDetail.LOCATION.length());
//special cases are the debugout and profile.out files. They are guaranteed
//to be associated with each task attempt since jvm reuse is disabled
//when profiling/debugging is enabled
- if (filter.equals(LogName.DEBUGOUT) || filter.equals(LogName.PROFILE)) {
+ for (LogName filter : new LogName[] { LogName.DEBUGOUT, LogName.PROFILE }) {
+ LogFileDetail l = new LogFileDetail();
+ l.location = loc;
l.length = new File(getBaseDir(l.location), filter.toString()).length();
l.start = 0;
- fis.close();
- return l;
+ allLogsFileDetails.put(filter, l);
}
str = fis.readLine();
while (str != null) {
- //look for the exact line containing the logname
- if (str.contains(filter.toString())) {
- str = str.substring(filter.toString().length()+1);
- String[] startAndLen = str.split(" ");
- l.start = Long.parseLong(startAndLen[0]);
- l.length = Long.parseLong(startAndLen[1]);
- break;
- }
+ LogFileDetail l = new LogFileDetail();
+ l.location = loc;
+ int idx = str.indexOf(':');
+ LogName filter = LogName.valueOf(str.substring(0, idx).toUpperCase());
+ str = str.substring(idx + 1);
+ String[] startAndLen = str.split(" ");
+ l.start = Long.parseLong(startAndLen[0]);
+ l.length = Long.parseLong(startAndLen[1]);
+ allLogsFileDetails.put(filter, l);
str = fis.readLine();
}
fis.close();
- return l;
+ return allLogsFileDetails;
}
private static File getTmpIndexFile(String taskid) {
@@ -150,16 +179,30 @@ public class TaskLog {
}
}
- private static File getBaseDir(String taskid) {
+ static File getBaseDir(String taskid) {
return new File(LOG_DIR, taskid);
}
- private static long prevOutLength;
- private static long prevErrLength;
- private static long prevLogLength;
+
+ static final List<LogName> LOGS_TRACKED_BY_INDEX_FILES =
+ Arrays.asList(LogName.STDOUT, LogName.STDERR, LogName.SYSLOG);
+
+ private static TaskAttemptID currentTaskid;
+
+ /**
+ * Map to store previous and current lengths.
+ */
+ private static Map<LogName, Long[]> logLengths =
+ new HashMap<LogName, Long[]>();
+ static {
+ for (LogName logName : LOGS_TRACKED_BY_INDEX_FILES) {
+ logLengths.put(logName, new Long[] { Long.valueOf(0L),
+ Long.valueOf(0L) });
+ }
+ }
- private static void writeToIndexFile(TaskAttemptID firstTaskid,
- boolean isCleanup)
- throws IOException {
+ static void writeToIndexFile(TaskAttemptID firstTaskid,
+ TaskAttemptID currentTaskid, boolean isCleanup,
+ Map<LogName, Long[]> lengths) throws IOException {
// To ensure atomicity of updates to index file, write to temporary index
// file first and then rename.
File tmpIndexFile = getTmpIndexFile(currentTaskid.toString());
@@ -172,17 +215,15 @@ public class TaskLog {
//STDOUT: <start-offset in the stdout file> <length>
//STDERR: <start-offset in the stderr file> <length>
//SYSLOG: <start-offset in the syslog file> <length>
- dos.writeBytes(LogFileDetail.LOCATION + firstTaskid.toString()+"\n"+
- LogName.STDOUT.toString()+":");
- dos.writeBytes(Long.toString(prevOutLength)+" ");
- dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.STDOUT)
- .length() - prevOutLength)+"\n"+LogName.STDERR+":");
- dos.writeBytes(Long.toString(prevErrLength)+" ");
- dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.STDERR)
- .length() - prevErrLength)+"\n"+LogName.SYSLOG.toString()+":");
- dos.writeBytes(Long.toString(prevLogLength)+" ");
- dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.SYSLOG)
- .length() - prevLogLength)+"\n");
+ dos.writeBytes(LogFileDetail.LOCATION
+ + firstTaskid.toString()
+ + "\n");
+ for (LogName logName : LOGS_TRACKED_BY_INDEX_FILES) {
+ Long[] lens = lengths.get(logName);
+ dos.writeBytes(logName.toString() + ":"
+ + lens[0].toString() + " "
+ + Long.toString(lens[1].longValue() - lens[0].longValue())
+ + "\n");}
dos.close();
File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
@@ -194,19 +235,13 @@ public class TaskLog {
}
localFS.rename (tmpIndexFilePath, indexFilePath);
}
- private static void resetPrevLengths(TaskAttemptID firstTaskid) {
- prevOutLength = getTaskLogFile(firstTaskid, LogName.STDOUT).length();
- prevErrLength = getTaskLogFile(firstTaskid, LogName.STDERR).length();
- prevLogLength = getTaskLogFile(firstTaskid, LogName.SYSLOG).length();
- }
- private volatile static TaskAttemptID currentTaskid = null;
public synchronized static void syncLogs(TaskAttemptID firstTaskid,
TaskAttemptID taskid)
throws IOException {
syncLogs(firstTaskid, taskid, false);
}
-
+
@SuppressWarnings("unchecked")
public synchronized static void syncLogs(TaskAttemptID firstTaskid,
TaskAttemptID taskid,
@@ -225,11 +260,25 @@ public class TaskLog {
}
}
}
+ // set start and end
+ for (LogName logName : LOGS_TRACKED_BY_INDEX_FILES) {
+ if (currentTaskid != taskid) {
+ // Set start = current-end
+ logLengths.get(logName)[0] =
+ Long.valueOf(getTaskLogFile(firstTaskid, logName).length());
+ }
+ // Set current end
+ logLengths.get(logName)[1] =
+ Long.valueOf(getTaskLogFile(firstTaskid, logName).length());
+ }
if (currentTaskid != taskid) {
+ if (currentTaskid != null) {
+ LOG.info("Starting logging for a new task " + taskid
+ + " in the same JVM as that of the first task " + firstTaskid);
+ }
currentTaskid = taskid;
- resetPrevLengths(firstTaskid);
}
- writeToIndexFile(firstTaskid, isCleanup);
+ writeToIndexFile(firstTaskid, taskid, isCleanup, logLengths);
}
/**
@@ -275,6 +324,7 @@ public class TaskLog {
return file.lastModified() < purgeTimeStamp;
}
}
+
/**
* Purge old user logs.
*
@@ -319,7 +369,9 @@ public class TaskLog {
public Reader(TaskAttemptID taskid, LogName kind,
long start, long end, boolean isCleanup) throws IOException {
// find the right log file
- LogFileDetail fileDetail = getLogFileDetail(taskid, kind, isCleanup);
+ Map<LogName, LogFileDetail> allFilesDetails =
+ getAllLogsFileDetails(taskid, isCleanup);
+ LogFileDetail fileDetail = allFilesDetails.get(kind);
// calculate the start and stop
long size = fileDetail.length;
if (start < 0) {
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java?rev=1077082&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java Fri Mar 4 03:38:37 2011
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.mapred.TaskLog.LogName;
+import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
+import org.apache.hadoop.util.StringUtils;
+
+class TaskLogsMonitor extends Thread {
+ static final Log LOG = LogFactory.getLog(TaskLogsMonitor.class);
+
+ long mapRetainSize, reduceRetainSize;
+
+ public TaskLogsMonitor(long mapRetSize, long reduceRetSize) {
+ mapRetainSize = mapRetSize;
+ reduceRetainSize = reduceRetSize;
+ LOG.info("Starting logs' monitor with mapRetainSize=" + mapRetainSize
+ + " and reduceRetainSize=" + reduceRetSize);
+ }
+
+ /**
+ * The list of tasks that have finished and so need their logs to be
+ * truncated.
+ */
+ private Map<TaskAttemptID, PerJVMInfo> finishedJVMs =
+ new HashMap<TaskAttemptID, PerJVMInfo>();
+
+ private static final int DEFAULT_BUFFER_SIZE = 4 * 1024;
+
+ static final int MINIMUM_RETAIN_SIZE_FOR_TRUNCATION = 0;
+
+ private static class PerJVMInfo {
+
+ List<Task> allAttempts;
+
+ public PerJVMInfo(List<Task> allAtmpts) {
+ this.allAttempts = allAtmpts;
+ }
+ }
+
+ /**
+ * Process(JVM/debug script) has finished. Asynchronously truncate the logs of
+ * all the corresponding tasks to the configured limit. In case of JVM, both
+ * the firstAttempt as well as the list of all attempts that ran in the same
+ * JVM have to be passed. For debug script, the (only) attempt itself should
+ * be passed as both the firstAttempt as well as the list of attempts.
+ *
+ * @param firstAttempt
+ * @param isTaskCleanup
+ */
+ void addProcessForLogTruncation(TaskAttemptID firstAttempt,
+ List<Task> allAttempts) {
+ LOG.info("Adding the jvm with first-attempt " + firstAttempt
+ + " for logs' truncation");
+ PerJVMInfo lInfo = new PerJVMInfo(allAttempts);
+ synchronized (finishedJVMs) {
+ finishedJVMs.put(firstAttempt, lInfo);
+ finishedJVMs.notify();
+ }
+ }
+
+ /**
+ * Process the removed task's logs. This involves truncating them to
+ * retainSize.
+ */
+ void truncateLogs(TaskAttemptID firstAttempt, PerJVMInfo lInfo) {
+
+ // Read the log-file details for all the attempts that ran in this JVM
+ Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails;
+ try {
+ taskLogFileDetails = getAllLogsFileDetails(lInfo.allAttempts);
+ } catch (IOException e) {
+ LOG.warn(
+ "Exception in truncateLogs while getting allLogsFileDetails()."
+ + " Ignoring the truncation of logs of this process.", e);
+ return;
+ }
+
+ Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails =
+ new HashMap<Task, Map<LogName, LogFileDetail>>();
+
+ File attemptLogDir = TaskLog.getBaseDir(firstAttempt.toString());
+
+ FileWriter tmpFileWriter;
+ FileReader logFileReader;
+ // Now truncate file by file
+ logNameLoop: for (LogName logName : LogName.values()) {
+
+ File logFile = TaskLog.getTaskLogFile(firstAttempt, logName);
+
+ // //// Optimization: if no task is over limit, just skip truncation-code
+ if (logFile.exists()
+ && !isTruncationNeeded(lInfo, taskLogFileDetails, logName)) {
+ LOG.debug("Truncation is not needed for "
+ + logFile.getAbsolutePath());
+ continue;
+ }
+ // //// End of optimization
+
+ // Truncation is needed for this log-file. Go ahead now.
+ File tmpFile = new File(attemptLogDir, "truncate.tmp");
+ try {
+ tmpFileWriter = new FileWriter(tmpFile);
+ } catch (IOException ioe) {
+ LOG.warn("Cannot open " + tmpFile.getAbsolutePath()
+ + " for writing truncated log-file "
+ + logFile.getAbsolutePath()
+ + ". Continuing with other log files. ", ioe);
+ continue;
+ }
+
+ try {
+ logFileReader = new FileReader(logFile);
+ } catch (FileNotFoundException fe) {
+ LOG.warn("Cannot open " + logFile.getAbsolutePath()
+ + " for reading. Continuing with other log files");
+ if (!tmpFile.delete()) {
+ LOG.warn("Cannot delete tmpFile " + tmpFile.getAbsolutePath());
+ }
+ continue;
+ }
+
+ long newCurrentOffset = 0;
+ // Process each attempt from the ordered list passed.
+ for (Task task : lInfo.allAttempts) {
+
+ // Truncate the log files of this task-attempt so that only the last
+ // retainSize many bytes of this log file is retained and the log
+ // file is reduced in size saving disk space.
+ long retainSize =
+ (task.isMapTask() ? mapRetainSize : reduceRetainSize);
+ LogFileDetail newLogFileDetail = new LogFileDetail();
+ try {
+ newLogFileDetail =
+ truncateALogFileOfAnAttempt(task.getTaskID(),
+ taskLogFileDetails.get(task).get(logName), retainSize,
+ tmpFileWriter, logFileReader);
+ } catch (IOException ioe) {
+ LOG.warn("Cannot truncate the log file "
+ + logFile.getAbsolutePath()
+ + ". Caught exception while handling " + task.getTaskID(),
+ ioe);
+ // revert back updatedTaskLogFileDetails
+ revertIndexFileInfo(lInfo, taskLogFileDetails,
+ updatedTaskLogFileDetails, logName);
+ if (!tmpFile.delete()) {
+ LOG.warn("Cannot delete tmpFile " + tmpFile.getAbsolutePath());
+ }
+ continue logNameLoop;
+ }
+
+ // Track information for updating the index file properly.
+ // Index files don't track DEBUGOUT and PROFILE logs, so skip'em.
+ if (TaskLog.LOGS_TRACKED_BY_INDEX_FILES.contains(logName)) {
+ if (!updatedTaskLogFileDetails.containsKey(task)) {
+ updatedTaskLogFileDetails.put(task,
+ new HashMap<LogName, LogFileDetail>());
+ }
+ // newLogFileDetail already has the location and length set, just
+ // set the start offset now.
+ newLogFileDetail.start = newCurrentOffset;
+ updatedTaskLogFileDetails.get(task).put(logName, newLogFileDetail);
+ newCurrentOffset += newLogFileDetail.length;
+ }
+ }
+
+ try {
+ tmpFileWriter.close();
+ } catch (IOException ioe) {
+ LOG.warn("Couldn't close the tmp file " + tmpFile.getAbsolutePath()
+ + ". Deleting it.", ioe);
+ revertIndexFileInfo(lInfo, taskLogFileDetails,
+ updatedTaskLogFileDetails, logName);
+ if (!tmpFile.delete()) {
+ LOG.warn("Cannot delete tmpFile " + tmpFile.getAbsolutePath());
+ }
+ continue;
+ }
+
+ if (!tmpFile.renameTo(logFile)) {
+ // If the tmpFile cannot be renamed revert back
+ // updatedTaskLogFileDetails to maintain the consistency of the
+ // original log file
+ revertIndexFileInfo(lInfo, taskLogFileDetails,
+ updatedTaskLogFileDetails, logName);
+ if (!tmpFile.delete()) {
+ LOG.warn("Cannot delete tmpFile " + tmpFile.getAbsolutePath());
+ }
+ }
+ }
+
+ // Update the index files
+ updateIndicesAfterLogTruncation(firstAttempt, updatedTaskLogFileDetails);
+ }
+
+ /**
+ * @param lInfo
+ * @param taskLogFileDetails
+ * @param updatedTaskLogFileDetails
+ * @param logName
+ */
+ private void revertIndexFileInfo(PerJVMInfo lInfo,
+ Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails,
+ Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails,
+ LogName logName) {
+ if (TaskLog.LOGS_TRACKED_BY_INDEX_FILES.contains(logName)) {
+ for (Task task : lInfo.allAttempts) {
+ if (!updatedTaskLogFileDetails.containsKey(task)) {
+ updatedTaskLogFileDetails.put(task,
+ new HashMap<LogName, LogFileDetail>());
+ }
+ updatedTaskLogFileDetails.get(task).put(logName,
+ taskLogFileDetails.get(task).get(logName));
+ }
+ }
+ }
+
+ /**
+ * Get the logFileDetails of all the list of attempts passed.
+ *
+ * @param lInfo
+ * @return a map of task to the log-file detail
+ * @throws IOException
+ */
+ private Map<Task, Map<LogName, LogFileDetail>> getAllLogsFileDetails(
+ final List<Task> allAttempts) throws IOException {
+ Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails =
+ new HashMap<Task, Map<LogName, LogFileDetail>>();
+ for (Task task : allAttempts) {
+ Map<LogName, LogFileDetail> allLogsFileDetails;
+ allLogsFileDetails =
+ TaskLog.getAllLogsFileDetails(task.getTaskID(),
+ task.isTaskCleanupTask());
+ taskLogFileDetails.put(task, allLogsFileDetails);
+ }
+ return taskLogFileDetails;
+ }
+
+ /**
+ * Check if truncation of logs is needed for the given jvmInfo. If all the
+ * tasks that ran in a JVM are within the log-limits, then truncation is not
+ * needed. Otherwise it is needed.
+ *
+ * @param lInfo
+ * @param taskLogFileDetails
+ * @param logName
+ * @return true if truncation is needed, false otherwise
+ */
+ private boolean isTruncationNeeded(PerJVMInfo lInfo,
+ Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails,
+ LogName logName) {
+ boolean truncationNeeded = false;
+ LogFileDetail logFileDetail = null;
+ for (Task task : lInfo.allAttempts) {
+ long taskRetainSize =
+ (task.isMapTask() ? mapRetainSize : reduceRetainSize);
+ Map<LogName, LogFileDetail> allLogsFileDetails =
+ taskLogFileDetails.get(task);
+ logFileDetail = allLogsFileDetails.get(logName);
+ if (taskRetainSize > MINIMUM_RETAIN_SIZE_FOR_TRUNCATION
+ && logFileDetail.length > taskRetainSize) {
+ truncationNeeded = true;
+ break;
+ }
+ }
+ return truncationNeeded;
+ }
+
+ /**
+ * Truncate the log file of this task-attempt so that only the last retainSize
+ * many bytes of each log file is retained and the log file is reduced in size
+ * saving disk space.
+ *
+ * @param taskID Task whose logs need to be truncated
+ * @param oldLogFileDetail contains the original log details for the attempt
+ * @param taskRetainSize retain-size
+ * @param tmpFileWriter New log file to write to. Already opened in append
+ * mode.
+ * @param logFileReader Original log file to read from.
+ * @return
+ * @throws IOException
+ */
+ private LogFileDetail truncateALogFileOfAnAttempt(
+ final TaskAttemptID taskID, final LogFileDetail oldLogFileDetail,
+ final long taskRetainSize, final FileWriter tmpFileWriter,
+ final FileReader logFileReader) throws IOException {
+ LogFileDetail newLogFileDetail = new LogFileDetail();
+
+ // ///////////// Truncate log file ///////////////////////
+
+ // New location of log file is same as the old
+ newLogFileDetail.location = oldLogFileDetail.location;
+ if (taskRetainSize > MINIMUM_RETAIN_SIZE_FOR_TRUNCATION
+ && oldLogFileDetail.length > taskRetainSize) {
+ LOG.info("Truncating logs for " + taskID + " from "
+ + oldLogFileDetail.length + "bytes to " + taskRetainSize
+ + "bytes.");
+ newLogFileDetail.length = taskRetainSize;
+ } else {
+ LOG.info("No truncation needed for " + taskID + " length is "
+ + oldLogFileDetail.length + " retain size " + taskRetainSize
+ + "bytes.");
+ newLogFileDetail.length = oldLogFileDetail.length;
+ }
+ long charsSkipped =
+ logFileReader.skip(oldLogFileDetail.length
+ - newLogFileDetail.length);
+ if (charsSkipped != oldLogFileDetail.length - newLogFileDetail.length) {
+ throw new IOException("Erroneously skipped " + charsSkipped
+ + " instead of the expected "
+ + (oldLogFileDetail.length - newLogFileDetail.length));
+ }
+ long alreadyRead = 0;
+ while (alreadyRead < newLogFileDetail.length) {
+ char tmpBuf[]; // Temporary buffer to read logs
+ if (newLogFileDetail.length - alreadyRead >= DEFAULT_BUFFER_SIZE) {
+ tmpBuf = new char[DEFAULT_BUFFER_SIZE];
+ } else {
+ tmpBuf = new char[(int) (newLogFileDetail.length - alreadyRead)];
+ }
+ int bytesRead = logFileReader.read(tmpBuf);
+ if (bytesRead < 0) {
+ break;
+ } else {
+ alreadyRead += bytesRead;
+ }
+ tmpFileWriter.write(tmpBuf);
+ }
+ // ////// End of truncating log file ///////////////////////
+
+ return newLogFileDetail;
+ }
+
+ /**
+ * Truncation of logs is done. Now sync the index files to reflect the
+ * truncated sizes.
+ *
+ * @param firstAttempt
+ * @param updatedTaskLogFileDetails
+ */
+ private void updateIndicesAfterLogTruncation(TaskAttemptID firstAttempt,
+ Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails) {
+ for (Entry<Task, Map<LogName, LogFileDetail>> entry :
+ updatedTaskLogFileDetails.entrySet()) {
+ Task task = entry.getKey();
+ Map<LogName, LogFileDetail> logFileDetails = entry.getValue();
+ Map<LogName, Long[]> logLengths = new HashMap<LogName, Long[]>();
+ // set current and previous lengths
+ for (LogName logName : TaskLog.LOGS_TRACKED_BY_INDEX_FILES) {
+ logLengths.put(logName, new Long[] { Long.valueOf(0L),
+ Long.valueOf(0L) });
+ LogFileDetail lfd = logFileDetails.get(logName);
+ if (lfd != null) {
+ // Set previous lengths
+ logLengths.get(logName)[0] = Long.valueOf(lfd.start);
+ // Set current lengths
+ logLengths.get(logName)[1] = Long.valueOf(lfd.start + lfd.length);
+ }
+ }
+ try {
+ TaskLog.writeToIndexFile(firstAttempt, task.getTaskID(),
+ task.isTaskCleanupTask(), logLengths);
+ } catch (IOException ioe) {
+ LOG.warn("Exception in updateIndicesAfterLogTruncation : "
+ + StringUtils.stringifyException(ioe));
+ LOG.warn("Exception encountered while updating index file of task "
+ + task.getTaskID()
+ + ". Ignoring and continuing with other tasks.");
+ }
+ }
+ }
+
+ /**
+ *
+ * @throws IOException
+ */
+ void monitorTaskLogs() throws IOException {
+
+ Map<TaskAttemptID, PerJVMInfo> tasksBeingTruncated =
+ new HashMap<TaskAttemptID, PerJVMInfo>();
+
+ // Start monitoring newly added finishedJVMs
+ synchronized (finishedJVMs) {
+ tasksBeingTruncated.clear();
+ tasksBeingTruncated.putAll(finishedJVMs);
+ finishedJVMs.clear();
+ }
+
+ for (Entry<TaskAttemptID, PerJVMInfo> entry :
+ tasksBeingTruncated.entrySet()) {
+ truncateLogs(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public void run() {
+
+ while (true) {
+ try {
+ monitorTaskLogs();
+ try {
+ synchronized (finishedJVMs) {
+ while (finishedJVMs.isEmpty()) {
+ finishedJVMs.wait();
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.warn(getName() + " is interrupted. Returning");
+ return;
+ }
+ } catch (Throwable e) {
+ LOG.warn(getName()
+ + " encountered an exception while monitoring : "
+ + StringUtils.stringifyException(e));
+ LOG.info("Ingoring the exception and continuing monitoring.");
+ }
+ }
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077082&r1=1077081&r2=1077082&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 03:38:37 2011
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
- package org.apache.hadoop.mapred;
+package org.apache.hadoop.mapred;
import java.io.File;
import java.io.FileNotFoundException;
@@ -26,6 +26,7 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -65,6 +66,8 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
+import org.apache.hadoop.mapred.TaskLog.LogName;
import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.mapred.TaskController.TaskControllerPathDeletionContext;
import org.apache.hadoop.mapred.TaskStatus.Phase;
@@ -112,7 +115,12 @@ public class TaskTracker
@Deprecated
static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY =
"mapred.tasktracker.pmem.reserved";
-
+
+ static final String MAP_USERLOG_RETAIN_SIZE =
+ "mapreduce.cluster.map.userlog.retain-size";
+ static final String REDUCE_USERLOG_RETAIN_SIZE =
+ "mapreduce.cluster.reduce.userlog.retain-size";
+
static final long WAIT_FOR_DONE = 3 * 1000;
private int httpPort;
@@ -233,6 +241,8 @@ public class TaskTracker
private long reduceSlotSizeMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
+ private TaskLogsMonitor taskLogsMonitor;
+
static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
"mapred.tasktracker.memory_calculator_plugin";
@@ -402,6 +412,14 @@ public class TaskTracker
}
}
+ TaskLogsMonitor getTaskLogsMonitor() {
+ return this.taskLogsMonitor;
+ }
+
+ void setTaskLogsMonitor(TaskLogsMonitor t) {
+ this.taskLogsMonitor = t;
+ }
+
static String getCacheSubdir() {
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
}
@@ -561,6 +579,10 @@ public class TaskTracker
initializeMemoryManagement();
+ setTaskLogsMonitor(new TaskLogsMonitor(getMapUserLogRetainSize(),
+ getReduceUserLogRetainSize()));
+ getTaskLogsMonitor().start();
+
this.indexCache = new IndexCache(this.fConf);
mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);
@@ -924,7 +946,11 @@ public class TaskTracker
//stop the launchers
this.mapLauncher.interrupt();
this.reduceLauncher.interrupt();
-
+
+ // All tasks are killed. So, they are removed from TaskLog monitoring also.
+ // Interrupt the monitor.
+ getTaskLogsMonitor().interrupt();
+
jvmManager.stop();
// shutdown RPC connections
@@ -991,6 +1017,13 @@ public class TaskTracker
initialize();
}
+ /**
+ * Blank constructor. Only usable by tests.
+ */
+ TaskTracker() {
+ server = null;
+ }
+
private void checkJettyPort(int port) throws IOException {
//See HADOOP-4744
if (port < 0) {
@@ -1331,6 +1364,22 @@ public class TaskTracker
return heartbeatResponse;
}
+ long getMapUserLogRetainSize() {
+ return fConf.getLong(MAP_USERLOG_RETAIN_SIZE, -1);
+ }
+
+ void setMapUserLogRetainSize(long retainSize) {
+ fConf.setLong(MAP_USERLOG_RETAIN_SIZE, retainSize);
+ }
+
+ long getReduceUserLogRetainSize() {
+ return fConf.getLong(REDUCE_USERLOG_RETAIN_SIZE, -1);
+ }
+
+ void setReduceUserLogRetainSize(long retainSize) {
+ fConf.setLong(REDUCE_USERLOG_RETAIN_SIZE, retainSize);
+ }
+
/**
* Return the total virtual memory available on this TaskTracker.
* @return total size of virtual memory.
@@ -2252,18 +2301,23 @@ public class TaskTracker
String taskSyslog ="";
String jobConf = task.getJobFile();
try {
- // get task's stdout file
- taskStdout = FileUtil.makeShellPath(
- TaskLog.getRealTaskLogFileLocation
- (task.getTaskID(), TaskLog.LogName.STDOUT));
- // get task's stderr file
- taskStderr = FileUtil.makeShellPath(
- TaskLog.getRealTaskLogFileLocation
- (task.getTaskID(), TaskLog.LogName.STDERR));
- // get task's syslog file
- taskSyslog = FileUtil.makeShellPath(
- TaskLog.getRealTaskLogFileLocation
- (task.getTaskID(), TaskLog.LogName.SYSLOG));
+ Map<LogName, LogFileDetail> allFilesDetails =
+ TaskLog.getAllLogsFileDetails(task.getTaskID(), false);
+ // get task's stdout file
+ taskStdout =
+ TaskLog.getRealTaskLogFilePath(
+ allFilesDetails.get(LogName.STDOUT).location,
+ LogName.STDOUT);
+ // get task's stderr file
+ taskStderr =
+ TaskLog.getRealTaskLogFilePath(
+ allFilesDetails.get(LogName.STDERR).location,
+ LogName.STDERR);
+ // get task's syslog file
+ taskSyslog =
+ TaskLog.getRealTaskLogFilePath(
+ allFilesDetails.get(LogName.SYSLOG).location,
+ LogName.SYSLOG);
} catch(IOException e){
LOG.warn("Exception finding task's stdout/err/syslog files");
}
@@ -2325,6 +2379,11 @@ public class TaskTracker
} catch(IOException ioe) {
LOG.warn("Exception in add diagnostics!");
}
+
+ // Debug-command is run. Do the post-debug-script-exit debug-logs
+ // processing. Truncate the logs.
+ getTaskLogsMonitor().addProcessForLogTruncation(
+ task.getTaskID(), Arrays.asList(task));
}
}
taskStatus.setProgress(0.0f);
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java?rev=1077082&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java Fri Mar 4 03:38:37 2011
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
+import org.apache.hadoop.mapred.TaskLog.LogName;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Verify the logs' monitoring functionality.
+ */
+public class TestTaskLogsMonitor {
+
+ static final Log LOG = LogFactory.getLog(TestTaskLogsMonitor.class);
+
+ /**
+ * clean-up any stale directories after enabling writable permissions for all
+ * attempt-dirs.
+ *
+ * @throws IOException
+ */
+ @After
+ public void tearDown() throws IOException {
+ File logDir = TaskLog.getUserLogDir();
+ for (File attemptDir : logDir.listFiles()) {
+ attemptDir.setWritable(true);
+ FileUtil.fullyDelete(attemptDir);
+ }
+ }
+
+ void writeRealBytes(TaskAttemptID firstAttemptID,
+ TaskAttemptID attemptID, LogName logName, long numBytes, char data)
+ throws IOException {
+
+ File logFile = TaskLog.getTaskLogFile(firstAttemptID, logName);
+
+ LOG.info("Going to write " + numBytes + " real bytes to the log file "
+ + logFile);
+
+ if (!logFile.getParentFile().exists()
+ && !logFile.getParentFile().mkdirs()) {
+ throw new IOException("Couldn't create all ancestor dirs for "
+ + logFile);
+ }
+
+ File attemptDir = TaskLog.getBaseDir(attemptID.toString());
+ if (!attemptDir.exists() && !attemptDir.mkdirs()) {
+ throw new IOException("Couldn't create all ancestor dirs for "
+ + logFile);
+ }
+
+ // Need to call up front to set currenttaskid.
+ TaskLog.syncLogs(firstAttemptID, attemptID);
+
+ FileWriter writer = new FileWriter(logFile, true);
+ for (long i = 0; i < numBytes; i++) {
+ writer.write(data);
+ }
+ writer.close();
+ TaskLog.syncLogs(firstAttemptID, attemptID);
+ LOG.info("Written " + numBytes + " real bytes to the log file "
+ + logFile);
+ }
+
+ private static Map<LogName, Long> getAllLogsFileLengths(
+ TaskAttemptID tid, boolean isCleanup) throws IOException {
+ Map<LogName, Long> allLogsFileLengths = new HashMap<LogName, Long>();
+
+ // If the index file doesn't exist, we cannot get log-file lengths. So set
+ // them to zero.
+ if (!TaskLog.getIndexFile(tid.toString(), isCleanup).exists()) {
+ for (LogName log : LogName.values()) {
+ allLogsFileLengths.put(log, Long.valueOf(0));
+ }
+ return allLogsFileLengths;
+ }
+
+ Map<LogName, LogFileDetail> logFilesDetails =
+ TaskLog.getAllLogsFileDetails(tid, isCleanup);
+ for (LogName log : logFilesDetails.keySet()) {
+ allLogsFileLengths.put(log,
+ Long.valueOf(logFilesDetails.get(log).length));
+ }
+ return allLogsFileLengths;
+ }
+
+ /**
+ * Test cases which don't need any truncation of log-files. Without JVM-reuse.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testNoTruncationNeeded() throws IOException {
+ TaskTracker taskTracker = new TaskTracker();
+ TaskLogsMonitor logsMonitor = new TaskLogsMonitor(1000L, 1000L);
+ taskTracker.setTaskLogsMonitor(logsMonitor);
+
+ TaskID baseId = new TaskID();
+ int taskcount = 0;
+
+ TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
+ Task task = new MapTask(null, attemptID, 0, null, null, 0, null);
+
+ // Let the tasks write logs within retain-size
+ writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 500, 'H');
+
+ logsMonitor.monitorTaskLogs();
+ File attemptDir = TaskLog.getBaseDir(attemptID.toString());
+ assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
+
+ // Finish the task and the JVM too.
+ logsMonitor.addProcessForLogTruncation(attemptID, Arrays.asList(task));
+
+ // There should be no truncation of the log-file.
+ logsMonitor.monitorTaskLogs();
+ assertTrue(attemptDir.exists());
+ File logFile = TaskLog.getTaskLogFile(attemptID, LogName.SYSLOG);
+ assertEquals(500, logFile.length());
+ // The index file should also be proper.
+ assertEquals(500, getAllLogsFileLengths(attemptID, false).get(
+ LogName.SYSLOG).longValue());
+
+ logsMonitor.monitorTaskLogs();
+ assertEquals(500, logFile.length());
+ }
+
+ /**
+ * Test the disabling of truncation of log-file.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testDisabledLogTruncation() throws IOException {
+ TaskTracker taskTracker = new TaskTracker();
+ // Anything less than 0 disables the truncation.
+ TaskLogsMonitor logsMonitor = new TaskLogsMonitor(-1L, -1L);
+ taskTracker.setTaskLogsMonitor(logsMonitor);
+
+ TaskID baseId = new TaskID();
+ int taskcount = 0;
+
+ TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
+ Task task = new MapTask(null, attemptID, 0, null, null, 0, null);
+
+ // Let the tasks write some logs
+ writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 1500, 'H');
+
+ logsMonitor.monitorTaskLogs();
+ File attemptDir = TaskLog.getBaseDir(attemptID.toString());
+ assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
+
+ // Finish the task and the JVM too.
+ logsMonitor.addProcessForLogTruncation(attemptID, Arrays.asList(task));
+
+ // The log-file should not be truncated.
+ logsMonitor.monitorTaskLogs();
+ assertTrue(attemptDir.exists());
+ File logFile = TaskLog.getTaskLogFile(attemptID, LogName.SYSLOG);
+ assertEquals(1500, logFile.length());
+ // The index file should also be proper.
+ assertEquals(1500, getAllLogsFileLengths(attemptID, false).get(
+ LogName.SYSLOG).longValue());
+ }
+
+ /**
+ * Test the truncation of log-file when JVMs are not reused.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testLogTruncationOnFinishing() throws IOException {
+ TaskTracker taskTracker = new TaskTracker();
+ TaskLogsMonitor logsMonitor = new TaskLogsMonitor(1000L, 1000L);
+ taskTracker.setTaskLogsMonitor(logsMonitor);
+
+ TaskID baseId = new TaskID();
+ int taskcount = 0;
+
+ TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
+ Task task = new MapTask(null, attemptID, 0, null, null, 0, null);
+
+ // Let the tasks write logs more than retain-size
+ writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 1500, 'H');
+
+ logsMonitor.monitorTaskLogs();
+ File attemptDir = TaskLog.getBaseDir(attemptID.toString());
+ assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
+
+ // Finish the task and the JVM too.
+ logsMonitor.addProcessForLogTruncation(attemptID, Arrays.asList(task));
+
+ // The log-file should now be truncated.
+ logsMonitor.monitorTaskLogs();
+ assertTrue(attemptDir.exists());
+ File logFile = TaskLog.getTaskLogFile(attemptID, LogName.SYSLOG);
+ assertEquals(1000, logFile.length());
+ // The index file should also be proper.
+ assertEquals(1000, getAllLogsFileLengths(attemptID, false).get(
+ LogName.SYSLOG).longValue());
+
+ logsMonitor.monitorTaskLogs();
+ assertEquals(1000, logFile.length());
+ }
+
+ /**
+ * Test the truncation of log-file when JVM-reuse is enabled.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testLogTruncationOnFinishingWithJVMReuse() throws IOException {
+ TaskTracker taskTracker = new TaskTracker();
+ TaskLogsMonitor logsMonitor = new TaskLogsMonitor(150L, 150L);
+ taskTracker.setTaskLogsMonitor(logsMonitor);
+
+ TaskID baseTaskID = new TaskID();
+ int attemptsCount = 0;
+
+ // Assuming the job's retain size is 150
+ TaskAttemptID attempt1 = new TaskAttemptID(baseTaskID, attemptsCount++);
+ Task task1 = new MapTask(null, attempt1, 0, null, null, 0, null);
+
+ // Let the tasks write logs more than retain-size
+ writeRealBytes(attempt1, attempt1, LogName.SYSLOG, 200, 'A');
+
+ logsMonitor.monitorTaskLogs();
+
+ File attemptDir = TaskLog.getBaseDir(attempt1.toString());
+ assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
+
+ // Start another attempt in the same JVM
+ TaskAttemptID attempt2 = new TaskAttemptID(baseTaskID, attemptsCount++);
+ Task task2 = new MapTask(null, attempt2, 0, null, null, 0, null);
+ logsMonitor.monitorTaskLogs();
+
+ // Let attempt2 also write some logs
+ writeRealBytes(attempt1, attempt2, LogName.SYSLOG, 100, 'B');
+ logsMonitor.monitorTaskLogs();
+
+ // Start yet another attempt in the same JVM
+ TaskAttemptID attempt3 = new TaskAttemptID(baseTaskID, attemptsCount++);
+ Task task3 = new MapTask(null, attempt3, 0, null, null, 0, null);
+ logsMonitor.monitorTaskLogs();
+
+ // Let attempt3 also write some logs
+ writeRealBytes(attempt1, attempt3, LogName.SYSLOG, 225, 'C');
+ logsMonitor.monitorTaskLogs();
+
+ // Finish the JVM.
+ logsMonitor.addProcessForLogTruncation(attempt1,
+ Arrays.asList((new Task[] { task1, task2, task3 })));
+
+ // The log-file should now be truncated.
+ logsMonitor.monitorTaskLogs();
+ assertTrue(attemptDir.exists());
+ File logFile = TaskLog.getTaskLogFile(attempt1, LogName.SYSLOG);
+ assertEquals(400, logFile.length());
+ // The index files should also be proper.
+ assertEquals(150, getAllLogsFileLengths(attempt1, false).get(
+ LogName.SYSLOG).longValue());
+ assertEquals(100, getAllLogsFileLengths(attempt2, false).get(
+ LogName.SYSLOG).longValue());
+ assertEquals(150, getAllLogsFileLengths(attempt3, false).get(
+ LogName.SYSLOG).longValue());
+
+ // assert the data.
+ FileReader reader =
+ new FileReader(TaskLog.getTaskLogFile(attempt1, LogName.SYSLOG));
+ int ch, bytesRead = 0;
+ boolean dataValid = true;
+ while ((ch = reader.read()) != -1) {
+ bytesRead++;
+ if (bytesRead <= 150) {
+ if ((char) ch != 'A') {
+ LOG.warn("Truncation didn't happen properly. At "
+ + (bytesRead + 1) + "th byte, expected 'A' but found "
+ + (char) ch);
+ dataValid = false;
+ }
+ } else if (bytesRead <= 250) {
+ if ((char) ch != 'B') {
+ LOG.warn("Truncation didn't happen properly. At "
+ + (bytesRead + 1) + "th byte, expected 'B' but found "
+ + (char) ch);
+ dataValid = false;
+ }
+ } else if ((char) ch != 'C') {
+ LOG.warn("Truncation didn't happen properly. At " + (bytesRead + 1)
+ + "th byte, expected 'C' but found " + (char) ch);
+ dataValid = false;
+ }
+ }
+ assertTrue("Log-truncation didn't happen properly!", dataValid);
+
+ logsMonitor.monitorTaskLogs();
+ assertEquals(400, logFile.length());
+ }
+
+ private static String TEST_ROOT_DIR =
+ new File(System.getProperty("test.build.data", "/tmp")).toURI().toString().replace(
+ ' ', '+');
+
+ public static class LoggingMapper<K, V> extends IdentityMapper<K, V> {
+
+ public void map(K key, V val, OutputCollector<K, V> output,
+ Reporter reporter) throws IOException {
+ // Write lots of logs
+ for (int i = 0; i < 1000; i++) {
+ System.out.println("Lots of logs! Lots of logs! "
+ + "Waiting to be truncated! Lots of logs!");
+ }
+ super.map(key, val, output, reporter);
+ }
+ }
+
+ /**
+ * Test logs monitoring with {@link MiniMRCluster}
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testLogsMonitoringWithMiniMR() throws IOException {
+
+ MiniMRCluster mr = null;
+ try {
+ JobConf clusterConf = new JobConf();
+ clusterConf.setLong(TaskTracker.MAP_USERLOG_RETAIN_SIZE, 10000L);
+ clusterConf.setLong(TaskTracker.REDUCE_USERLOG_RETAIN_SIZE, 10000L);
+ mr = new MiniMRCluster(1, "file:///", 3, null, null, clusterConf);
+
+ JobConf conf = mr.createJobConf();
+
+ Path inDir = new Path(TEST_ROOT_DIR + "/input");
+ Path outDir = new Path(TEST_ROOT_DIR + "/output");
+ FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(outDir)) {
+ fs.delete(outDir, true);
+ }
+ if (!fs.exists(inDir)) {
+ fs.mkdirs(inDir);
+ }
+ String input = "The quick brown fox jumped over the lazy dog";
+ DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+ file.writeBytes(input);
+ file.close();
+
+ conf.setInputFormat(TextInputFormat.class);
+ conf.setOutputKeyClass(LongWritable.class);
+ conf.setOutputValueClass(Text.class);
+
+ FileInputFormat.setInputPaths(conf, inDir);
+ FileOutputFormat.setOutputPath(conf, outDir);
+ conf.setNumMapTasks(1);
+ conf.setNumReduceTasks(0);
+ conf.setMapperClass(LoggingMapper.class);
+
+ RunningJob job = JobClient.runJob(conf);
+ assertTrue(job.getJobState() == JobStatus.SUCCEEDED);
+ for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0)) {
+ long length =
+ TaskLog.getTaskLogFile(tce.getTaskAttemptId(),
+ TaskLog.LogName.STDOUT).length();
+ assertTrue("STDOUT log file length for " + tce.getTaskAttemptId()
+ + " is " + length + " and not <=10000", length <= 10000);
+ }
+ } finally {
+ if (mr != null) {
+ mr.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test the truncation of DEBUGOUT file by {@link TaskLogsMonitor}
+ * @throws IOException
+ */
+ @Test
+ public void testDebugLogsTruncationWithMiniMR() throws IOException {
+
+ MiniMRCluster mr = null;
+ try {
+ JobConf clusterConf = new JobConf();
+ clusterConf.setLong(TaskTracker.MAP_USERLOG_RETAIN_SIZE, 10000L);
+ clusterConf.setLong(TaskTracker.REDUCE_USERLOG_RETAIN_SIZE, 10000L);
+ mr = new MiniMRCluster(1, "file:///", 3, null, null, clusterConf);
+
+ JobConf conf = mr.createJobConf();
+
+ Path inDir = new Path(TEST_ROOT_DIR + "/input");
+ Path outDir = new Path(TEST_ROOT_DIR + "/output");
+ FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(outDir)) {
+ fs.delete(outDir, true);
+ }
+ if (!fs.exists(inDir)) {
+ fs.mkdirs(inDir);
+ }
+ String input = "The quick brown fox jumped over the lazy dog";
+ DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+ file.writeBytes(input);
+ file.close();
+
+ conf.setInputFormat(TextInputFormat.class);
+ conf.setOutputKeyClass(LongWritable.class);
+ conf.setOutputValueClass(Text.class);
+
+ FileInputFormat.setInputPaths(conf, inDir);
+ FileOutputFormat.setOutputPath(conf, outDir);
+ conf.setNumMapTasks(1);
+ conf.setMaxMapAttempts(1);
+ conf.setNumReduceTasks(0);
+ conf.setMapperClass(TestMiniMRMapRedDebugScript.MapClass.class);
+
+ // copy debug script to cache from local file system.
+ Path scriptPath = new Path(TEST_ROOT_DIR, "debug-script.txt");
+ String debugScriptContent =
+ "for ((i=0;i<1000;i++)); " + "do "
+ + "echo \"Lots of logs! Lots of logs! "
+ + "Waiting to be truncated! Lots of logs!\";" + "done";
+ DataOutputStream scriptFile = fs.create(scriptPath);
+ scriptFile.writeBytes(debugScriptContent);
+ scriptFile.close();
+ new File(scriptPath.toUri().getPath()).setExecutable(true);
+
+ URI uri = scriptPath.toUri();
+ DistributedCache.createSymlink(conf);
+ DistributedCache.addCacheFile(uri, conf);
+ conf.setMapDebugScript(scriptPath.toUri().getPath());
+
+ RunningJob job = null;
+ try {
+ JobClient jc = new JobClient(conf);
+ job = jc.submitJob(conf);
+ try {
+ jc.monitorAndPrintJob(conf, job);
+ } catch (InterruptedException e) {
+ //
+ }
+ } catch (IOException ioe) {
+ } finally{
+ for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0)) {
+ File debugOutFile =
+ TaskLog.getTaskLogFile(tce.getTaskAttemptId(),
+ TaskLog.LogName.DEBUGOUT);
+ if (debugOutFile.exists()) {
+ long length = debugOutFile.length();
+ assertTrue("DEBUGOUT log file length for "
+ + tce.getTaskAttemptId() + " is " + length
+ + " and not =10000", length == 10000);
+ }
+ }
+ }
+ } finally {
+ if (mr != null) {
+ mr.shutdown();
+ }
+ }
+ }
+}