You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2010/03/09 11:56:45 UTC
svn commit: r920793 - in /hadoop/mapreduce/trunk: ./
src/c++/task-controller/ src/c++/task-controller/tests/ src/java/
src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/
src/java/org/apache/hadoop/mapreduce/server/tasktracker/ src...
Author: vinodkv
Date: Tue Mar 9 10:56:44 2010
New Revision: 920793
URL: http://svn.apache.org/viewvc?rev=920793&view=rev
Log:
MAPREDUCE-927. Cleanup of task-logs should happen in TaskTracker instead of the Child. Contributed by Amareshwari Sriramadasu.
Added:
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/UserLogCleaner.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h
hadoop/mapreduce/trunk/src/c++/task-controller/tests/test-task-controller.c
hadoop/mapreduce/trunk/src/java/mapred-default.xml
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Mar 9 10:56:44 2010
@@ -430,6 +430,9 @@ Trunk (unreleased changes)
MAPREDUCE-1573. TestStreamingAsDifferentUser fails if run as tt_user.
(Ravi Gummadi via vinodkv)
+ MAPREDUCE-927. Cleanup of task-logs should happen in TaskTracker instead
+ of the Child. (Amareshwari Sriramadasu via vinodkv)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.c?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c Tue Mar 9 10:56:44 2010
@@ -213,9 +213,17 @@ char *get_task_dir_path(const char *tt_r
/**
* Get the log directory for the given attempt.
*/
-char *get_task_log_dir(const char *log_dir, const char *attempt_id) {
- return concatenate(ATTEMPT_LOG_DIR_PATTERN, "task_log_dir", 2, log_dir,
- attempt_id);
+char *get_task_log_dir(const char *log_dir, const char *job_id,
+ const char *attempt_id) {
+ return concatenate(ATTEMPT_LOG_DIR_PATTERN, "task_log_dir", 3, log_dir,
+ job_id, attempt_id);
+}
+
+/**
+ * Get the log directory for the given job.
+ */
+char *get_job_log_dir(const char *log_dir, const char *job_id) {
+ return concatenate(JOB_LOG_DIR_PATTERN, "job_log_dir", 2, log_dir, job_id);
}
/**
@@ -499,15 +507,61 @@ int prepare_attempt_directories(const ch
}
/**
+ * Function to prepare the job log dir for the child. It gives the user
+ * ownership of the job's log-dir to the user and group ownership to the
+ * user running tasktracker.
+ * * sudo chown user:mapred log-dir/userlogs/$jobid
+ * * sudo chmod -R 2770 log-dir/userlogs/$jobid // user is same as tt_user
+ * * sudo chmod -R 2570 log-dir/userlogs/$jobid // user is not tt_user
+ */
+int prepare_job_logs(const char *log_dir, const char *job_id,
+ mode_t permissions) {
+
+ char *job_log_dir = get_job_log_dir(log_dir, job_id);
+ if (job_log_dir == NULL) {
+ fprintf(LOGFILE, "Couldn't get job log directory %s.\n", job_log_dir);
+ return -1;
+ }
+
+ struct stat filestat;
+ if (stat(job_log_dir, &filestat) != 0) {
+ if (errno == ENOENT) {
+#ifdef DEBUG
+ fprintf(LOGFILE, "job_log_dir %s doesn't exist. Not doing anything.\n",
+ job_log_dir);
+#endif
+ free(job_log_dir);
+ return 0;
+ } else {
+ // stat failed because of something else!
+ fprintf(LOGFILE, "Failed to stat the job log dir %s\n", job_log_dir);
+ free(job_log_dir);
+ return -1;
+ }
+ }
+
+ gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
+ if (secure_path(job_log_dir, user_detail->pw_uid, tasktracker_gid,
+ permissions, S_ISGID | permissions, 1) != 0) {
+ fprintf(LOGFILE, "Failed to secure the log_dir %s\n", job_log_dir);
+ free(job_log_dir);
+ return -1;
+ }
+ free(job_log_dir);
+ return 0;
+}
+
+/**
* Function to prepare the task logs for the child. It gives the user
* ownership of the attempt's log-dir to the user and group ownership to the
* user running tasktracker.
- * * sudo chown user:mapred log-dir/userlogs/$attemptid
- * * sudo chmod -R 2770 log-dir/userlogs/$attemptid
+ * * sudo chown user:mapred log-dir/userlogs/$jobid/$attemptid
+ * * sudo chmod -R 2770 log-dir/userlogs/$jobid/$attemptid
*/
-int prepare_task_logs(const char *log_dir, const char *task_id) {
+int prepare_task_logs(const char *log_dir, const char *job_id,
+ const char *task_id) {
- char *task_log_dir = get_task_log_dir(log_dir, task_id);
+ char *task_log_dir = get_task_log_dir(log_dir, job_id, task_id);
if (task_log_dir == NULL) {
fprintf(LOGFILE, "Couldn't get task_log directory %s.\n", task_log_dir);
return -1;
@@ -525,10 +579,12 @@ int prepare_task_logs(const char *log_di
fprintf(LOGFILE, "task_log_dir %s doesn't exist. Not doing anything.\n",
task_log_dir);
#endif
+ free(task_log_dir);
return 0;
} else {
// stat failed because of something else!
fprintf(LOGFILE, "Failed to stat the task_log_dir %s\n", task_log_dir);
+ free(task_log_dir);
return -1;
}
}
@@ -538,8 +594,10 @@ int prepare_task_logs(const char *log_di
S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG, 1) != 0) {
// setgid on dirs but not files, 770. As of now, there are no files though
fprintf(LOGFILE, "Failed to secure the log_dir %s\n", task_log_dir);
+ free(task_log_dir);
return -1;
}
+ free(task_log_dir);
return 0;
}
@@ -668,10 +726,13 @@ int initialize_user(const char *user) {
* Function to prepare the job directories for the task JVM.
* We do the following:
* * sudo chown user:mapred -R taskTracker/$user/jobcache/$jobid
+ * * sudo chown user:mapred -R logs/userlogs/$jobid
* * if user is not $tt_user,
* * sudo chmod 2570 -R taskTracker/$user/jobcache/$jobid
+ * * sudo chmod 2570 -R logs/userlogs/$jobid
* * else // user is tt_user
* * sudo chmod 2770 -R taskTracker/$user/jobcache/$jobid
+ * * sudo chmod 2770 -R logs/userlogs/$jobid
* *
* * For any user, sudo chmod 2770 taskTracker/$user/jobcache/$jobid/work
*/
@@ -783,11 +844,32 @@ int initialize_job(const char *jobid, co
}
free(local_dir);
free(full_local_dir_str);
- cleanup();
+ int exit_code = 0;
if (failed) {
- return INITIALIZE_JOB_FAILED;
+ exit_code = INITIALIZE_JOB_FAILED;
+ goto cleanup;
}
- return 0;
+
+ char *log_dir = (char *) get_value(TT_LOG_DIR_KEY);
+ if (log_dir == NULL) {
+ fprintf(LOGFILE, "Log directory is not configured.\n");
+ exit_code = INVALID_TT_LOG_DIR;
+ goto cleanup;
+ }
+
+ if (prepare_job_logs(log_dir, jobid, permissions) != 0) {
+ fprintf(LOGFILE, "Couldn't prepare job logs directory %s for %s.\n",
+ log_dir, jobid);
+ exit_code = PREPARE_JOB_LOGS_FAILED;
+ }
+
+ cleanup:
+ // free configurations
+ cleanup();
+ if (log_dir != NULL) {
+ free(log_dir);
+ }
+ return exit_code;
}
/**
@@ -891,7 +973,7 @@ int initialize_task(const char *jobid, c
goto cleanup;
}
- if (prepare_task_logs(log_dir, taskid) != 0) {
+ if (prepare_task_logs(log_dir, jobid, taskid) != 0) {
fprintf(LOGFILE, "Couldn't prepare task logs directory %s for %s.\n",
log_dir, taskid);
exit_code = PREPARE_TASK_LOGS_FAILED;
Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.h?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h Tue Mar 9 10:56:44 2010
@@ -73,7 +73,8 @@ enum errorcodes {
UNABLE_TO_EXECUTE_DEBUG_SCRIPT, //21
INVALID_CONF_DIR, //22
UNABLE_TO_BUILD_PATH, //23
- INVALID_TASKCONTROLLER_PERMISSIONS //24
+ INVALID_TASKCONTROLLER_PERMISSIONS, //24
+ PREPARE_JOB_LOGS_FAILED, //25
};
#define USER_DIR_PATTERN "%s/taskTracker/%s"
@@ -86,7 +87,9 @@ enum errorcodes {
#define JOB_DIR_TO_ATTEMPT_DIR_PATTERN "%s/%s"
-#define ATTEMPT_LOG_DIR_PATTERN "%s/userlogs/%s"
+#define JOB_LOG_DIR_PATTERN "%s/userlogs/%s"
+
+#define ATTEMPT_LOG_DIR_PATTERN JOB_LOG_DIR_PATTERN"/%s"
#define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh"
Modified: hadoop/mapreduce/trunk/src/c++/task-controller/tests/test-task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/tests/test-task-controller.c?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/tests/test-task-controller.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/tests/test-task-controller.c Tue Mar 9 10:56:44 2010
@@ -171,13 +171,26 @@ void test_get_task_launcher_file() {
assert(ret == 0);
}
+void test_get_job_log_dir() {
+ char *logdir = (char *) get_job_log_dir("/tmp/testing",
+ "job_200906101234_0001");
+ printf("logdir obtained is %s\n", logdir);
+ int ret = 0;
+ if (strcmp(logdir, "/tmp/testing/userlogs/job_200906101234_0001") != 0) {
+ ret = -1;
+ }
+ free(logdir);
+ assert(ret == 0);
+}
+
void test_get_task_log_dir() {
char *logdir = (char *) get_task_log_dir("/tmp/testing",
- "attempt_200906112028_0001_m_000000_0");
+ "job_200906101234_0001", "attempt_200906112028_0001_m_000000_0");
printf("logdir obtained is %s\n", logdir);
int ret = 0;
if (strcmp(logdir,
- "/tmp/testing/userlogs/attempt_200906112028_0001_m_000000_0") != 0) {
+ "/tmp/testing/userlogs/job_200906101234_0001/attempt_200906112028_0001_m_000000_0")
+ != 0) {
ret = -1;
}
free(logdir);
@@ -203,6 +216,9 @@ int main(int argc, char **argv) {
printf("\nTesting get_task_launcher_file()\n");
test_get_task_launcher_file();
+ printf("\nTesting get_job_log_dir()\n");
+ test_get_job_log_dir();
+
printf("\nTesting get_task_log_dir()\n");
test_get_task_log_dir();
Modified: hadoop/mapreduce/trunk/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/mapred-default.xml?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/mapred-default.xml (original)
+++ hadoop/mapreduce/trunk/src/java/mapred-default.xml Tue Mar 9 10:56:44 2010
@@ -672,10 +672,10 @@
</property>
<property>
- <name>mapreduce.task.userlog.retain.hours</name>
+ <name>mapreduce.job.userlog.retain.hours</name>
<value>24</value>
<description>The maximum time, in hours, for which the user-logs are to be
- retained.
+ retained after the job completion.
</description>
</property>
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java Tue Mar 9 10:56:44 2010
@@ -193,7 +193,6 @@ class Child {
numTasksToExecute = job.getNumTasksToExecutePerJvm();
assert(numTasksToExecute != 0);
- TaskLog.cleanup(job.getInt(JobContext.TASK_LOG_RETAIN_HOURS, 24));
task.setConf(job);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Tue Mar 9 10:56:44 2010
@@ -22,7 +22,6 @@ import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.File;
-import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -34,11 +33,13 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.util.ProcessTree;
import org.apache.hadoop.util.Shell;
import org.apache.log4j.Appender;
@@ -50,6 +51,7 @@ import org.apache.log4j.Logger;
* This class uses the system property <code>hadoop.log.dir</code>.
*
*/
+@InterfaceAudience.Private
public class TaskLog {
private static final Log LOG =
LogFactory.getLog(TaskLog.class);
@@ -71,7 +73,7 @@ public class TaskLog {
}
public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
- return new File(getBaseDir(taskid.toString()), filter.toString());
+ return new File(getAttemptDir(taskid.toString()), filter.toString());
}
public static File getRealTaskLogFileLocation(TaskAttemptID taskid,
LogName filter) {
@@ -82,7 +84,7 @@ public class TaskLog {
LOG.error("getTaskLogFileDetail threw an exception " + ie);
return null;
}
- return new File(getBaseDir(l.location), filter.toString());
+ return new File(getAttemptDir(l.location), filter.toString());
}
private static class LogFileDetail {
final static String LOCATION = "LOG_DIR:";
@@ -118,7 +120,7 @@ public class TaskLog {
//to be associated with each task attempt since jvm reuse is disabled
//when profiling/debugging is enabled
if (filter.equals(LogName.DEBUGOUT) || filter.equals(LogName.PROFILE)) {
- l.length = new File(getBaseDir(l.location), filter.toString()).length();
+ l.length = new File(getAttemptDir(l.location), filter.toString()).length();
l.start = 0;
fis.close();
return l;
@@ -140,7 +142,7 @@ public class TaskLog {
}
private static File getTmpIndexFile(String taskid) {
- return new File(getBaseDir(taskid), "log.tmp");
+ return new File(getAttemptDir(taskid), "log.tmp");
}
public static File getIndexFile(String taskid) {
return getIndexFile(taskid, false);
@@ -148,9 +150,9 @@ public class TaskLog {
public static File getIndexFile(String taskid, boolean isCleanup) {
if (isCleanup) {
- return new File(getBaseDir(taskid), "log.index.cleanup");
+ return new File(getAttemptDir(taskid), "log.index.cleanup");
} else {
- return new File(getBaseDir(taskid), "log.index");
+ return new File(getAttemptDir(taskid), "log.index");
}
}
@@ -158,8 +160,9 @@ public class TaskLog {
return System.getProperty("hadoop.log.dir");
}
- static File getBaseDir(String taskid) {
- return new File(LOG_DIR, taskid);
+ static File getAttemptDir(String taskid) {
+ return new File(getJobDir(TaskAttemptID.forName(taskid).getJobID()),
+ taskid);
}
private static long prevOutLength;
private static long prevErrLength;
@@ -271,38 +274,6 @@ public class TaskLog {
}
}
- private static class TaskLogsPurgeFilter implements FileFilter {
- long purgeTimeStamp;
-
- TaskLogsPurgeFilter(long purgeTimeStamp) {
- this.purgeTimeStamp = purgeTimeStamp;
- }
-
- public boolean accept(File file) {
- LOG.debug("PurgeFilter - file: " + file + ", mtime: " + file.lastModified() + ", purge: " + purgeTimeStamp);
- return file.lastModified() < purgeTimeStamp;
- }
- }
- /**
- * Purge old user logs.
- *
- * @throws IOException
- */
- public static synchronized void cleanup(int logsRetainHours
- ) throws IOException {
- // Purge logs of tasks on this tasktracker if their
- // mtime has exceeded "mapred.task.log.retain" hours
- long purgeTimeStamp = System.currentTimeMillis() -
- (logsRetainHours*60L*60*1000);
- File[] oldTaskLogs = LOG_DIR.listFiles
- (new TaskLogsPurgeFilter(purgeTimeStamp));
- if (oldTaskLogs != null) {
- for (int i=0; i < oldTaskLogs.length; ++i) {
- FileUtil.fullyDelete(oldTaskLogs[i]);
- }
- }
- }
-
static class Reader extends InputStream {
private long bytesRemaining;
private FileInputStream file;
@@ -341,7 +312,7 @@ public class TaskLog {
start += fileDetail.start;
end += fileDetail.start;
bytesRemaining = end - start;
- file = new FileInputStream(new File(getBaseDir(fileDetail.location),
+ file = new FileInputStream(new File(getAttemptDir(fileDetail.location),
kind.toString()));
// skip upto start
long pos = 0;
@@ -648,4 +619,14 @@ public class TaskLog {
return LOG_DIR;
}
+ /**
+ * Get the user log directory for the job jobid.
+ *
+ * @param jobid
+ * @return user log directory for the job
+ */
+ public static File getJobDir(JobID jobid) {
+ return new File(getUserLogDir(), jobid.toString());
+ }
+
} // TaskLog
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java Tue Mar 9 10:56:44 2010
@@ -141,7 +141,7 @@ public class TaskLogServlet extends Http
*/
static Configuration getConfFromJobACLsFile(String attemptIdStr) {
Configuration conf = new Configuration(false);
- conf.addResource(new Path(TaskLog.getBaseDir(attemptIdStr).toString(),
+ conf.addResource(new Path(TaskLog.getAttemptDir(attemptIdStr).toString(),
TaskRunner.jobACLsFile));
return conf;
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Mar 9 10:56:44 2010
@@ -257,7 +257,7 @@ public class TaskTracker
int workerThreads;
CleanupQueue directoryCleanupThread;
volatile JvmManager jvmManager;
-
+ UserLogCleaner taskLogCleanupThread;
private TaskMemoryManagerThread taskMemoryManager;
private boolean taskMemoryManagerEnabled = true;
private long totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
@@ -709,6 +709,9 @@ public class TaskTracker
setIndexCache(new IndexCache(this.fConf));
+ //clear old user logs
+ taskLogCleanupThread.clearOldUserLogs(this.fConf);
+
mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);
reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
mapLauncher.start();
@@ -943,8 +946,7 @@ public class TaskTracker
new LocalDirAllocator(MRConfig.LOCAL_DIR);
// intialize the job directory
- @SuppressWarnings("unchecked")
- private void localizeJob(TaskInProgress tip
+ RunningJob localizeJob(TaskInProgress tip
) throws IOException, InterruptedException {
Task t = tip.getTask();
JobID jobId = t.getJobID();
@@ -957,6 +959,8 @@ public class TaskTracker
if (!rjob.localized) {
JobConf localJobConf = localizeJobFiles(t, rjob);
+ // initialize job log directory
+ initializeJobLogDir(jobId);
// Now initialize the job via task-controller so as to set
// ownership/permissions of jars, job-work-dir. Note that initializeJob
@@ -974,7 +978,7 @@ public class TaskTracker
rjob.localized = true;
}
}
- launchTaskForJob(tip, new JobConf(rjob.jobConf));
+ return rjob;
}
private FileSystem getFS(final Path filePath, JobID jobId,
@@ -1056,6 +1060,14 @@ public class TaskTracker
return localJobConf;
}
+ // create job userlog dir
+ void initializeJobLogDir(JobID jobId) {
+ // remove it from tasklog cleanup thread first,
+ // it might be added there because of tasktracker reinit or restart
+ taskLogCleanupThread.unmarkJobFromLogDeletion(jobId);
+ localizer.initializeJobLogDir(jobId);
+ }
+
/**
* Download the job configuration file from the FS.
*
@@ -1263,7 +1275,8 @@ public class TaskTracker
server.start();
this.httpPort = server.getPort();
checkJettyPort(httpPort);
-
+ // create task log cleanup thread
+ setTaskLogCleanupThread(new UserLogCleaner(fConf));
// Initialize the jobACLSManager
jobACLsManager = new TaskTrackerJobACLsManager(this);
initialize();
@@ -1282,6 +1295,9 @@ public class TaskTracker
taskCleanupThread.setDaemon(true);
taskCleanupThread.start();
directoryCleanupThread = new CleanupQueue();
+ // start tasklog cleanup thread
+ taskLogCleanupThread.setDaemon(true);
+ taskLogCleanupThread.start();
}
// only used by tests
@@ -1293,6 +1309,14 @@ public class TaskTracker
return directoryCleanupThread;
}
+ UserLogCleaner getTaskLogCleanupThread() {
+ return this.taskLogCleanupThread;
+ }
+
+ void setTaskLogCleanupThread(UserLogCleaner t) {
+ this.taskLogCleanupThread = t;
+ }
+
void setIndexCache(IndexCache cache) {
this.indexCache = cache;
}
@@ -1808,7 +1832,7 @@ public class TaskTracker
* @param action The action with the job
* @throws IOException
*/
- private synchronized void purgeJob(KillJobAction action) throws IOException {
+ synchronized void purgeJob(KillJobAction action) throws IOException {
JobID jobId = action.getJobID();
LOG.info("Received 'KillJobAction' for job: " + jobId);
RunningJob rjob = null;
@@ -1833,6 +1857,11 @@ public class TaskTracker
if (!rjob.keepJobFiles) {
removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID().toString());
}
+ // add job to taskLogCleanupThread
+ long now = System.currentTimeMillis();
+ taskLogCleanupThread.markJobLogsForDeletion(now, rjob.jobConf,
+ rjob.jobid);
+
// Remove this job
rjob.tasks.clear();
}
@@ -2186,7 +2215,8 @@ public class TaskTracker
*/
void startNewTask(TaskInProgress tip) {
try {
- localizeJob(tip);
+ RunningJob rjob = localizeJob(tip);
+ launchTaskForJob(tip, new JobConf(rjob.jobConf));
} catch (Throwable e) {
String msg = ("Error initializing " + tip.getTask().getTaskID() +
":\n" + StringUtils.stringifyException(e));
@@ -3262,6 +3292,10 @@ public class TaskTracker
FetchStatus getFetchStatus() {
return f;
}
+
+ JobConf getJobConf() {
+ return jobConf;
+ }
}
/**
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/UserLogCleaner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/UserLogCleaner.java?rev=920793&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/UserLogCleaner.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/UserLogCleaner.java Tue Mar 9 10:56:44 2010
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+
+/**
+ * This is a thread in TaskTracker to cleanup user logs.
+ *
+ * Responsibilities of this thread include:
+ * <ol>
+ * <li>Removing old user logs</li>
+ * </ol>
+ */
+@InterfaceAudience.Private
+class UserLogCleaner extends Thread {
+ private static final Log LOG = LogFactory.getLog(UserLogCleaner.class);
+ static final int DEFAULT_USER_LOG_RETAIN_HOURS = 24; // 1 day
+ static final long DEFAULT_THREAD_SLEEP_TIME = 1000 * 60 * 60; // 1 hour
+
+ private Map<JobID, Long> completedJobs = Collections
+ .synchronizedMap(new HashMap<JobID, Long>());
+ private final long threadSleepTime;
+ private MRAsyncDiskService logAsyncDisk;
+ private Clock clock;
+
+ UserLogCleaner(Configuration conf) throws IOException {
+ threadSleepTime = conf.getLong(TTConfig.TT_USERLOGCLEANUP_SLEEPTIME,
+ DEFAULT_THREAD_SLEEP_TIME);
+ logAsyncDisk = new MRAsyncDiskService(FileSystem.getLocal(conf), TaskLog
+ .getUserLogDir().toString());
+ setClock(new Clock());
+ }
+
+ void setClock(Clock clock) {
+ this.clock = clock;
+ }
+
+ Clock getClock() {
+ return this.clock;
+ }
+
+ @Override
+ public void run() {
+ // This thread wakes up after every threadSleepTime interval
+ // and deletes if there are any old logs.
+ while (true) {
+ try {
+ // sleep
+ Thread.sleep(threadSleepTime);
+ processCompletedJobs();
+ } catch (Throwable e) {
+ LOG.warn(getClass().getSimpleName()
+ + " encountered an exception while monitoring :", e);
+ LOG.info("Ingoring the exception and continuing monitoring.");
+ }
+ }
+ }
+
+ void processCompletedJobs() throws IOException {
+ long now = clock.getTime();
+ // iterate through completedJobs and remove old logs.
+ synchronized (completedJobs) {
+ Iterator<Entry<JobID, Long>> completedJobIter = completedJobs.entrySet()
+ .iterator();
+ while (completedJobIter.hasNext()) {
+ Entry<JobID, Long> entry = completedJobIter.next();
+ // see if the job is old enough
+ if (entry.getValue().longValue() <= now) {
+ // add the job logs directory to for delete
+ deleteLogPath(TaskLog.getJobDir(entry.getKey()).getAbsolutePath());
+ completedJobIter.remove();
+ }
+ }
+ }
+ }
+
+ /**
+ * Clears all the logs in userlog directory.
+ *
+ * Adds the job directories for deletion with default retain hours. Deletes
+ * all other directories, if any. This is usually called on reinit/restart of
+ * the TaskTracker
+ *
+ * @param conf
+ * @throws IOException
+ */
+ void clearOldUserLogs(Configuration conf) throws IOException {
+ File userLogDir = TaskLog.getUserLogDir();
+ if (userLogDir.exists()) {
+ String[] logDirs = userLogDir.list();
+ if (logDirs.length > 0) {
+ // add all the log dirs to taskLogsMnonitor.
+ long now = clock.getTime();
+ for (String logDir : logDirs) {
+ if (logDir.equals(logAsyncDisk.TOBEDELETED)) {
+ // skip this
+ continue;
+ }
+ JobID jobid = null;
+ try {
+ jobid = JobID.forName(logDir);
+ } catch (IllegalArgumentException ie) {
+ // if the directory is not a jobid, delete it immediately
+ deleteLogPath(new File(userLogDir, logDir).getAbsolutePath());
+ continue;
+ }
+ // add the job log directory with default retain hours, if it is not
+ // already added
+ if (!completedJobs.containsKey(jobid)) {
+ markJobLogsForDeletion(now, conf, jobid);
+ }
+ }
+ }
+ }
+ }
+
+ private int getUserlogRetainMillis(Configuration conf) {
+ return (conf == null ? UserLogCleaner.DEFAULT_USER_LOG_RETAIN_HOURS
+ : conf.getInt(JobContext.USER_LOG_RETAIN_HOURS,
+ UserLogCleaner.DEFAULT_USER_LOG_RETAIN_HOURS)) * 1000 * 60 * 60;
+ }
+
+ /**
+ * Adds job user-log directory to cleanup thread to delete logs after user-log
+ * retain hours.
+ *
+ * If the configuration is null or user-log retain hours is not configured, it
+ * is deleted after
+ * {@value UserLogCleaner#DEFAULT_USER_LOG_RETAIN_HOURS}
+ *
+ * @param jobCompletionTime
+ * job completion time in millis
+ * @param conf
+ * The configuration from which user-log retain hours should be read
+ * @param jobid
+ * JobID for which user logs should be deleted
+ */
+ public void markJobLogsForDeletion(long jobCompletionTime, Configuration conf,
+ JobID jobid) {
+ long retainTimeStamp = jobCompletionTime + (getUserlogRetainMillis(conf));
+ LOG.info("Adding " + jobid + " for user-log deletion with retainTimeStamp:"
+ + retainTimeStamp);
+ completedJobs.put(jobid, Long.valueOf(retainTimeStamp));
+ }
+
+ /**
+ * Remove job from user log deletion.
+ *
+ * @param jobid
+ */
+ public void unmarkJobFromLogDeletion(JobID jobid) {
+ if (completedJobs.remove(jobid) != null) {
+ LOG.info("Removing " + jobid + " from user-log deletion");
+ }
+ }
+
+ /**
+ * Deletes the log path.
+ *
+ * This path will be removed immediately through {@link MRAsyncDiskService}
+ *
+ * @param logPath
+ * @throws IOException
+ */
+ private void deleteLogPath(String logPath) throws IOException {
+ LOG.info("Deleting user log path " + logPath);
+ logAsyncDisk.moveAndDeleteAbsolutePath(logPath);
+ }
+}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Tue Mar 9 10:56:44 2010
@@ -113,6 +113,8 @@ public interface JobContext {
"mapreduce.job.cache.archives.visibilities";
public static final String CACHE_SYMLINK =
"mapreduce.job.cache.symlink.create";
+ public static final String USER_LOG_RETAIN_HOURS =
+ "mapreduce.job.userlog.retain.hours";
public static final String IO_SORT_FACTOR =
"mapreduce.task.io.sort.factor";
@@ -143,8 +145,6 @@ public interface JobContext {
public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir";
public static final String TASK_USERLOG_LIMIT =
"mapreduce.task.userlog.limit.kb";
- public static final String TASK_LOG_RETAIN_HOURS =
- "mapred.task.userlog.retain.hours";
public static final String MAP_SORT_SPILL_PERCENT =
"mapreduce.map.sort.spill.percent";
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Tue Mar 9 10:56:44 2010
@@ -28,10 +28,11 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.TaskTracker;
import org.apache.hadoop.mapred.TaskController.InitializationContext;
+import org.apache.hadoop.mapreduce.JobID;
/**
*
@@ -358,4 +359,22 @@ public class Localizer {
+ attemptId.toString());
}
}
+
+ /**
+ * Create job log directory and set appropriate permissions for the directory.
+ *
+ * @param jobId
+ */
+ public void initializeJobLogDir(JobID jobId) {
+ File jobUserLogDir = TaskLog.getJobDir(jobId);
+ if (!jobUserLogDir.exists()) {
+ boolean ret = jobUserLogDir.mkdirs();
+ if (!ret) {
+ LOG.warn("Could not create job user log directory: " + jobUserLogDir);
+ return;
+ }
+ }
+ Localizer.PermissionsHandler.setPermissions(jobUserLogDir,
+ Localizer.PermissionsHandler.sevenZeroZero);
+ }
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java Tue Mar 9 10:56:44 2010
@@ -88,4 +88,6 @@ public interface TTConfig extends MRConf
"mapreduce.tasktracker.keytab.file";
public static final String TT_GROUP =
"mapreduce.tasktracker.group";
+ public static final String TT_USERLOGCLEANUP_SLEEPTIME =
+ "mapreduce.tasktracker.userlogcleanup.sleeptime";
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java Tue Mar 9 10:56:44 2010
@@ -292,7 +292,7 @@ public class ConfigUtil {
Configuration.addDeprecation("mapred.userlog.limit.kb",
new String[] {JobContext.TASK_USERLOG_LIMIT});
Configuration.addDeprecation("mapred.userlog.retain.hours",
- new String[] {JobContext.TASK_LOG_RETAIN_HOURS});
+ new String[] {JobContext.USER_LOG_RETAIN_HOURS});
Configuration.addDeprecation("mapred.task.profile.params",
new String[] {JobContext.TASK_PROFILE_PARAMS});
Configuration.addDeprecation("io.sort.spill.percent",
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java Tue Mar 9 10:56:44 2010
@@ -67,7 +67,7 @@ public class MRAsyncDiskService {
* @param localFileSystem The localFileSystem used for deletions.
* @param volumes The roots of the file system volumes.
*/
- public MRAsyncDiskService(FileSystem localFileSystem, String[] volumes)
+ public MRAsyncDiskService(FileSystem localFileSystem, String... volumes)
throws IOException {
this.volumes = new String[volumes.length];
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Tue Mar 9 10:56:44 2010
@@ -206,6 +206,11 @@ public class TestLocalizationWithLinuxTa
checkFilePermissions(file.toUri().getPath(), expectedFilePerms, task
.getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
}
+
+ // check job user-log directory permissions
+ File jobLogDir = TaskLog.getJobDir(jobId);
+ checkFilePermissions(jobLogDir.toString(), expectedDirPerms, task.getUser(),
+ ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
}
@Override
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Tue Mar 9 10:56:44 2010
@@ -84,6 +84,7 @@ public class TestTaskTrackerLocalization
protected Path attemptWorkDir;
protected File[] attemptLogFiles;
protected JobConf localizedTaskConf;
+ private TaskInProgress tip;
/**
* Dummy method in this base class. Only derived classes will define this
@@ -122,7 +123,17 @@ public class TestTaskTrackerLocalization
trackerFConf.setStrings(MRConfig.LOCAL_DIR, localDirs);
// Create the job configuration file. Same as trackerConf in this test.
- Job job = new Job(trackerFConf);
+ Configuration jobConf = new Configuration(trackerFConf);
+ // Set job view ACLs in conf sothat validation of contents of jobACLsFile
+ // can be done against this value. Have both users and groups
+ String jobViewACLs = "user1,user2, group1,group2";
+ jobConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs);
+
+ jobConf.setInt(JobContext.USER_LOG_RETAIN_HOURS, 0);
+
+ Job job = new Job(jobConf);
+ String jtIdentifier = "200907202331";
+ jobId = new JobID(jtIdentifier, 1);
// JobClient uploads the job jar to the file system and sets it in the
// jobConf.
@@ -131,9 +142,15 @@ public class TestTaskTrackerLocalization
// JobClient uploads the jobConf to the file system.
File jobConfFile = uploadJobConf(job.getConfiguration());
+ // create jobTokens file
+ uploadJobTokensFile();
+
// Set up the TaskTracker
tracker = new TaskTracker();
tracker.setConf(trackerFConf);
+ tracker.setIndexCache(new IndexCache(trackerFConf));
+ tracker.setTaskLogCleanupThread(new UserLogCleaner(trackerFConf));
+ tracker.setTaskMemoryManagerEnabledFlag();
// for test case system FS is the local FS
tracker.systemFS = FileSystem.getLocal(trackerFConf);
@@ -143,13 +160,6 @@ public class TestTaskTrackerLocalization
taskTrackerUGI = UserGroupInformation.getCurrentUser();
// Set up the task to be localized
- String jtIdentifier = "200907202331";
- jobId = new JobID(jtIdentifier, 1);
-
- TaskTracker.RunningJob rjob = new TaskTracker.RunningJob(jobId);
- rjob.ugi = UserGroupInformation.getCurrentUser();
- tracker.runningJobs.put(jobId, rjob);
-
taskId =
new TaskAttemptID(jtIdentifier, jobId.getId(), TaskType.MAP, 1, 0);
task =
@@ -157,10 +167,6 @@ public class TestTaskTrackerLocalization
task.setConf(job.getConfiguration()); // Set conf. Set user name in particular.
task.setUser(UserGroupInformation.getCurrentUser().getUserName());
- // create jobTokens file
- uploadJobTokensFile();
-
-
taskController = new DefaultTaskController();
taskController.setConf(trackerFConf);
taskController.setup();
@@ -168,6 +174,10 @@ public class TestTaskTrackerLocalization
tracker.setTaskController(taskController);
tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
taskController));
+
+ // mimic register task
+ // create the tip
+ tip = tracker.new TaskInProgress(task, trackerFConf);
}
/**
@@ -409,24 +419,8 @@ public class TestTaskTrackerLocalization
if (!canRun()) {
return;
}
- tracker.getLocalizer().initializeUserDirs(task.getUser());
-
- // /////////// The main method being tested
- localizedJobConf = tracker.localizeJobFiles(task,
- new TaskTracker.RunningJob(task.getJobID()));
- // ///////////
-
- // Now initialize the job via task-controller so as to set
- // ownership/permissions of jars, job-work-dir
- JobInitializationContext context = new JobInitializationContext();
- context.jobid = jobId;
- context.user = task.getUser();
- context.workDir =
- new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
-
- // /////////// The method being tested
- taskController.initializeJob(context);
- // ///////////
+ TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+ localizedJobConf = rjob.getJobConf();
checkJobLocalization();
}
@@ -494,6 +488,13 @@ public class TestTaskTrackerLocalization
assertTrue(
"mapreduce.job.jar is not set properly to the target users directory : "
+ localizedJobJar, mapredJarFlag);
+
+ // check job user-log directory permissions
+ File jobLogDir = TaskLog.getJobDir(jobId);
+ assertTrue("job log directory " + jobLogDir + " does not exist!", jobLogDir
+ .exists());
+ checkFilePermissions(jobLogDir.toString(), "drwx------", task.getUser(),
+ taskTrackerUGI.getGroupNames()[0]);
}
/**
@@ -506,25 +507,9 @@ public class TestTaskTrackerLocalization
if (!canRun()) {
return;
}
- tracker.getLocalizer().initializeUserDirs(task.getUser());
- localizedJobConf = tracker.localizeJobFiles(task,
- new TaskTracker.RunningJob(task.getJobID()));
-
- // Set job view ACLs in conf sothat validation of contents of jobACLsFile
- // can be done against this value. Have both users and groups
- String jobViewACLs = "user1,user2, group1,group2";
- localizedJobConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs);
-
- // Now initialize the job via task-controller so as to set
- // ownership/permissions of jars, job-work-dir
- JobInitializationContext jobContext = new JobInitializationContext();
- jobContext.jobid = jobId;
- jobContext.user = task.getUser();
- jobContext.workDir =
- new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
- taskController.initializeJob(jobContext);
+ TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+ localizedJobConf = rjob.getJobConf();
- TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
tip.setJobConf(localizedJobConf);
// ////////// The central method being tested
@@ -605,9 +590,7 @@ public class TestTaskTrackerLocalization
.getPath(), "tmp").exists());
// Make sure that the logs are setup properly
- File logDir =
- new File(HADOOP_LOG_DIR, TaskLog.USERLOGS_DIR_NAME + Path.SEPARATOR
- + task.getTaskID().toString());
+ File logDir = TaskLog.getAttemptDir(taskId.toString());
assertTrue("task's log dir " + logDir.toString() + " doesn't exist!",
logDir.exists());
checkFilePermissions(logDir.getAbsolutePath(), "drwx------", task
@@ -761,21 +744,12 @@ public class TestTaskTrackerLocalization
private void testTaskCleanup(boolean needCleanup, boolean jvmReuse)
throws Exception {
// Localize job and localize task.
- tracker.getLocalizer().initializeUserDirs(task.getUser());
- localizedJobConf = tracker.localizeJobFiles(task,
- new TaskTracker.RunningJob(task.getJobID()));
+ TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
+ TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+ localizedJobConf = rjob.getJobConf();
if (jvmReuse) {
localizedJobConf.setNumTasksToExecutePerJvm(2);
}
- // Now initialize the job via task-controller so as to set
- // ownership/permissions of jars, job-work-dir
- JobInitializationContext jobContext = new JobInitializationContext();
- jobContext.jobid = jobId;
- jobContext.user = localizedJobConf.getUser();
- jobContext.workDir =
- new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
- taskController.initializeJob(jobContext);
- TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
tip.setJobConf(localizedJobConf);
tip.localizeTask(task);
Path workDir =
@@ -829,21 +803,16 @@ public class TestTaskTrackerLocalization
*/
private void verifyUserLogsCleanup()
throws IOException {
- Path logDir =
- new Path(HADOOP_LOG_DIR.toURI().getPath(), TaskLog.USERLOGS_DIR_NAME
- + Path.SEPARATOR + task.getTaskID().toString());
-
+ // verify user logs cleanup
+ File jobUserLogDir = TaskLog.getJobDir(jobId);
// Logs should be there before cleanup.
- assertTrue("Userlogs dir " + logDir + " is not presen as expected!!",
- tracker.getLocalFileSystem().exists(logDir));
-
- // ////////// Another being tested
- TaskLog.cleanup(-1); // -1 so as to move purgeTimeStamp to future and file
- // modification time behind retainTimeStatmp
- // //////////
-
+ assertTrue("Userlogs dir " + jobUserLogDir + " is not present as expected!!",
+ jobUserLogDir.exists());
+ tracker.purgeJob(new KillJobAction(jobId));
+ tracker.getTaskLogCleanupThread().processCompletedJobs();
+
// Logs should be gone after cleanup.
- assertFalse("Userlogs dir " + logDir + " is not deleted as expected!!",
- tracker.getLocalFileSystem().exists(logDir));
+ assertFalse("Userlogs dir " + jobUserLogDir + " is not deleted as expected!!",
+ jobUserLogDir.exists());
}
}
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java?rev=920793&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java Tue Mar 9 10:56:44 2010
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Test;
+
+public class TestUserLogCleanup {
+ private static String jtid = "test";
+ private static long ONE_HOUR = 1000 * 60 * 60;
+ private Localizer localizer;
+ private UserLogCleaner taskLogCleanupThread;
+ private TaskTracker tt;
+ private FakeClock myClock = new FakeClock();
+ private JobID jobid1 = new JobID(jtid, 1);
+ private JobID jobid2 = new JobID(jtid, 2);
+ private JobID jobid3 = new JobID(jtid, 3);
+ private JobID jobid4 = new JobID(jtid, 4);
+ private File foo = new File(TaskLog.getUserLogDir(), "foo");
+ private File bar = new File(TaskLog.getUserLogDir(), "bar");
+
+ public TestUserLogCleanup() throws IOException {
+ Configuration conf = new Configuration();
+ localizer = new Localizer(FileSystem.get(conf), conf
+ .getStrings(MRConfig.LOCAL_DIR), new DefaultTaskController());
+ taskLogCleanupThread = new UserLogCleaner(conf);
+ taskLogCleanupThread.setClock(myClock);
+ tt = new TaskTracker();
+ tt.setLocalizer(localizer);
+ tt.setTaskLogCleanupThread(taskLogCleanupThread);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ FileUtil.fullyDelete(TaskLog.getUserLogDir());
+ }
+
+ private File localizeJob(JobID jobid) throws IOException {
+ File jobUserlog = TaskLog.getJobDir(jobid);
+
+ // localize job log directory
+ tt.initializeJobLogDir(jobid);
+ assertTrue(jobUserlog + " directory is not created.", jobUserlog.exists());
+ return jobUserlog;
+ }
+
+ private void jobFinished(JobID jobid, int logRetainHours) {
+ Configuration jobconf = new Configuration();
+ jobconf.setInt(JobContext.USER_LOG_RETAIN_HOURS, logRetainHours);
+ taskLogCleanupThread.markJobLogsForDeletion(myClock.getTime(), jobconf,
+ jobid);
+ }
+
+ /**
+ * Tests job user-log directory deletion.
+ *
+ * Adds two jobs for log deletion. One with one hour retain hours, other with
+ * two retain hours. After an hour,
+ * TaskLogCleanupThread.processCompletedJobs() call,
+ * makes sure job with 1hr retain hours is removed and other is retained.
+ * After one more hour, job with 2hr retain hours is also removed.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testJobLogCleanup() throws IOException {
+ File jobUserlog1 = localizeJob(jobid1);
+ File jobUserlog2 = localizeJob(jobid2);
+
+ // add job user log directory for deletion, with 2 hours for deletion
+ jobFinished(jobid1, 2);
+
+ // add the job for deletion with one hour as retain hours
+ jobFinished(jobid2, 1);
+
+ // remove old logs and see jobid1 is not removed and jobid2 is removed
+ myClock.advance(ONE_HOUR);
+ taskLogCleanupThread.processCompletedJobs();
+ assertTrue(jobUserlog1 + " got deleted", jobUserlog1.exists());
+ assertFalse(jobUserlog2 + " still exists.", jobUserlog2.exists());
+
+ myClock.advance(ONE_HOUR);
+ // remove old logs and see jobid1 is removed now
+ taskLogCleanupThread.processCompletedJobs();
+ assertFalse(jobUserlog1 + " still exists.", jobUserlog1.exists());
+ }
+
+ /**
+ * Tests user-log directory cleanup on a TT re-init with 3 hours as log
+ * retain hours for tracker.
+ *
+ * Adds job1 deletion before the re-init with 2 hour retain hours.
+ * Adds job2 for which there are no tasks/killJobAction after the re-init.
+ * Adds job3 for which there is localizeJob followed by killJobAction
+ * with 3 hours as retain hours.
+ * Adds job4 for which there are some tasks after the re-init.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testUserLogCleanup() throws IOException {
+ File jobUserlog1 = localizeJob(jobid1);
+ File jobUserlog2 = localizeJob(jobid2);
+ File jobUserlog3 = localizeJob(jobid3);
+ File jobUserlog4 = localizeJob(jobid4);
+ // create a some files/dirs in userlog
+ foo.mkdirs();
+ bar.createNewFile();
+
+ // add the jobid1 for deletion with retainhours = 2
+ jobFinished(jobid1, 2);
+
+ // time is now 1.
+ myClock.advance(ONE_HOUR);
+
+ // mimic TaskTracker reinit
+ // clear userlog directory
+ // job directories will be added with 3 hours as retain hours. They will be
+ // deleted at time 4.
+ Configuration conf = new Configuration();
+ conf.setInt(JobContext.USER_LOG_RETAIN_HOURS, 3);
+ taskLogCleanupThread.clearOldUserLogs(conf);
+ assertFalse(foo.exists());
+ assertFalse(bar.exists());
+ assertTrue(jobUserlog1.exists());
+ assertTrue(jobUserlog2.exists());
+ assertTrue(jobUserlog3.exists());
+ assertTrue(jobUserlog4.exists());
+ assertTrue(new File(TaskLog.getUserLogDir(), MRAsyncDiskService.TOBEDELETED)
+ .exists());
+
+ myClock.advance(ONE_HOUR);
+ // time is now 2.
+ taskLogCleanupThread.processCompletedJobs();
+ assertFalse(jobUserlog1.exists());
+ assertTrue(jobUserlog2.exists());
+ assertTrue(jobUserlog3.exists());
+ assertTrue(jobUserlog4.exists());
+
+ // mimic localizeJob followed KillJobAction for jobid3
+ // add the job for deletion with retainhours = 3.
+ // jobid3 should be deleted at time 5.
+ jobUserlog3 = localizeJob(jobid3);
+ jobFinished(jobid3, 3);
+
+ // mimic localizeJob for jobid4
+ jobUserlog4 = localizeJob(jobid4);
+
+ // do cleanup
+ myClock.advance(2 * ONE_HOUR);
+ // time is now 4.
+ taskLogCleanupThread.processCompletedJobs();
+
+ // jobid2 will be deleted
+ assertFalse(jobUserlog1.exists());
+ assertFalse(jobUserlog2.exists());
+ assertTrue(jobUserlog3.exists());
+ assertTrue(jobUserlog4.exists());
+
+ myClock.advance(ONE_HOUR);
+ // time is now 5.
+ // do cleanup again
+ taskLogCleanupThread.processCompletedJobs();
+
+ // jobid3 will be deleted
+ assertFalse(jobUserlog1.exists());
+ assertFalse(jobUserlog2.exists());
+ assertFalse(jobUserlog3.exists());
+ assertTrue(jobUserlog4.exists());
+ }
+
+ /**
+ * Tests user-log directory cleanup on a TT restart.
+ *
+ * Adds job1 deletion before the restart with 2 hour retain hours.
+ * Adds job2 for which there are no tasks/killJobAction after the restart.
+ * Adds job3 for which there is localizeJob followed by killJobAction after
+ * the restart with 3 hours retain hours.
+ * Adds job4 for which there are some tasks after the restart.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testUserLogCleanupAfterRestart() throws IOException {
+ File jobUserlog1 = localizeJob(jobid1);
+ File jobUserlog2 = localizeJob(jobid2);
+ File jobUserlog3 = localizeJob(jobid3);
+ File jobUserlog4 = localizeJob(jobid4);
+ // create a some files/dirs in userlog
+ foo.mkdirs();
+ bar.createNewFile();
+
+ // add the jobid1 for deletion with retainhours = 2
+ jobFinished(jobid1, 2);
+
+ // time is now 1.
+ myClock.advance(ONE_HOUR);
+
+ // mimic TaskTracker restart
+ // clear userlog directory
+ // job directories will be added with 3 hours as retain hours.
+ Configuration conf = new Configuration();
+ conf.setInt(JobContext.USER_LOG_RETAIN_HOURS, 3);
+ taskLogCleanupThread = new UserLogCleaner(conf);
+ myClock = new FakeClock(); // clock is reset.
+ taskLogCleanupThread.setClock(myClock);
+ taskLogCleanupThread.clearOldUserLogs(conf);
+ tt.setTaskLogCleanupThread(taskLogCleanupThread);
+ assertFalse(foo.exists());
+ assertFalse(bar.exists());
+ assertTrue(jobUserlog1.exists());
+ assertTrue(jobUserlog2.exists());
+ assertTrue(jobUserlog3.exists());
+ assertTrue(jobUserlog4.exists());
+ assertTrue(new File(TaskLog.getUserLogDir(), MRAsyncDiskService.TOBEDELETED)
+ .exists());
+
+ myClock.advance(ONE_HOUR);
+ // time is now 1.
+ taskLogCleanupThread.processCompletedJobs();
+ assertTrue(jobUserlog1.exists());
+ assertTrue(jobUserlog2.exists());
+ assertTrue(jobUserlog3.exists());
+ assertTrue(jobUserlog4.exists());
+
+ // mimic localizeJob followed KillJobAction for jobid3
+ // add the job for deletion with retainhours = 3.
+ // jobid3 should be deleted at time 4.
+ jobUserlog3 = localizeJob(jobid3);
+ jobFinished(jobid3, 3);
+
+ // mimic localizeJob for jobid4
+ jobUserlog4 = localizeJob(jobid4);
+
+ // do cleanup
+ myClock.advance(2 * ONE_HOUR);
+ // time is now 3.
+ taskLogCleanupThread.processCompletedJobs();
+
+ // jobid1 and jobid2 will be deleted
+ assertFalse(jobUserlog1.exists());
+ assertFalse(jobUserlog2.exists());
+ assertTrue(jobUserlog3.exists());
+ assertTrue(jobUserlog4.exists());
+
+ myClock.advance(ONE_HOUR);
+ // time is now 4.
+ // do cleanup again
+ taskLogCleanupThread.processCompletedJobs();
+
+ // jobid3 will be deleted
+ assertFalse(jobUserlog1.exists());
+ assertFalse(jobUserlog2.exists());
+ assertFalse(jobUserlog3.exists());
+ assertTrue(jobUserlog4.exists());
+ }
+}