You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:03:24 UTC
svn commit: r1077322 [2/2] - in
/hadoop/common/branches/branch-0.20-security-patches/src:
c++/task-controller/ c++/task-controller/tests/ mapred/
mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/
mapred/org/apache/hadoop/mapreduce/se...
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsTruncater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsTruncater.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsTruncater.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsTruncater.java Fri Mar 4 04:03:23 2011
@@ -0,0 +1,605 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
+import org.apache.hadoop.mapred.TaskLog.LogName;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapreduce.server.tasktracker.JVMInfo;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.JvmFinishedEvent;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.UserLogManager;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Verify the logs' truncation functionality.
+ */
+public class TestTaskLogsTruncater {
+
+ static final Log LOG = LogFactory.getLog(TestTaskLogsTruncater.class);
+
+ /**
+ * clean-up any stale directories after enabling writable permissions for all
+ * attempt-dirs.
+ *
+ * @throws IOException
+ */
+ @After
+ public void tearDown() throws IOException {
+ File logDir = TaskLog.getUserLogDir();
+ for (File attemptDir : logDir.listFiles()) {
+ attemptDir.setWritable(true);
+ FileUtil.fullyDelete(attemptDir);
+ }
+ }
+
+ void writeRealBytes(TaskAttemptID firstAttemptID,
+ TaskAttemptID attemptID, LogName logName, long numBytes, char data)
+ throws IOException {
+
+ File logFile = TaskLog.getTaskLogFile(firstAttemptID, logName);
+
+ LOG.info("Going to write " + numBytes + " real bytes to the log file "
+ + logFile);
+
+ if (!logFile.getParentFile().exists()
+ && !logFile.getParentFile().mkdirs()) {
+ throw new IOException("Couldn't create all ancestor dirs for "
+ + logFile);
+ }
+
+ File attemptDir = TaskLog.getAttemptDir(attemptID.toString());
+ if (!attemptDir.exists() && !attemptDir.mkdirs()) {
+ throw new IOException("Couldn't create all ancestor dirs for "
+ + logFile);
+ }
+
+ // Need to call up front to set currenttaskid.
+ TaskLog.syncLogs(firstAttemptID, attemptID);
+
+ FileWriter writer = new FileWriter(logFile, true);
+ for (long i = 0; i < numBytes; i++) {
+ writer.write(data);
+ }
+ writer.close();
+ TaskLog.syncLogs(firstAttemptID, attemptID);
+ LOG.info("Written " + numBytes + " real bytes to the log file "
+ + logFile);
+ }
+
+ private static Map<LogName, Long> getAllLogsFileLengths(
+ TaskAttemptID tid, boolean isCleanup) throws IOException {
+ Map<LogName, Long> allLogsFileLengths = new HashMap<LogName, Long>();
+
+ // If the index file doesn't exist, we cannot get log-file lengths. So set
+ // them to zero.
+ if (!TaskLog.getIndexFile(tid.toString(), isCleanup).exists()) {
+ for (LogName log : LogName.values()) {
+ allLogsFileLengths.put(log, Long.valueOf(0));
+ }
+ return allLogsFileLengths;
+ }
+
+ Map<LogName, LogFileDetail> logFilesDetails =
+ TaskLog.getAllLogsFileDetails(tid, isCleanup);
+ for (LogName log : logFilesDetails.keySet()) {
+ allLogsFileLengths.put(log,
+ Long.valueOf(logFilesDetails.get(log).length));
+ }
+ return allLogsFileLengths;
+ }
+
+ private Configuration setRetainSizes(long mapRetainSize,
+ long reduceRetainSize) {
+ Configuration conf = new Configuration();
+ conf.setLong(TaskLogsTruncater.MAP_USERLOG_RETAIN_SIZE, mapRetainSize);
+ conf.setLong(TaskLogsTruncater.REDUCE_USERLOG_RETAIN_SIZE, reduceRetainSize);
+ return conf;
+ }
+
+ /**
+ * Test cases which don't need any truncation of log-files. Without JVM-reuse.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testNoTruncationNeeded() throws IOException {
+ Configuration conf = setRetainSizes(1000L, 1000L);
+ UserLogManager logManager = new UtilsForTests.InLineUserLogManager(conf);
+
+ TaskID baseId = new TaskID();
+ int taskcount = 0;
+
+ TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
+ Task task = new MapTask(null, attemptID, 0, new JobSplit.TaskSplitIndex(),
+ 0);
+
+ // Let the tasks write logs within retain-size
+ for (LogName log : LogName.values()) {
+ writeRealBytes(attemptID, attemptID, log, 500, 'H');
+ }
+ File logIndex = TaskLog.getIndexFile(attemptID.toString(), false);
+ long indexModificationTimeStamp = logIndex.lastModified();
+
+ File attemptDir = TaskLog.getAttemptDir(attemptID.toString());
+ assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
+ assertEquals("index file got modified", indexModificationTimeStamp,
+ logIndex.lastModified());
+
+ // Finish the task and the JVM too.
+ JVMInfo jvmInfo = new JVMInfo(attemptID, Arrays.asList(task));
+ logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
+
+ // There should be no truncation of the log-file.
+ assertTrue(attemptDir.exists());
+ assertEquals("index file got modified", indexModificationTimeStamp,
+ logIndex.lastModified());
+
+ Map<LogName, Long> logLengths = getAllLogsFileLengths(attemptID, false);
+ for (LogName log : LogName.values()) {
+ File logFile = TaskLog.getTaskLogFile(attemptID, log);
+ assertEquals(500, logFile.length());
+ // The index file should also be proper.
+ assertEquals(500, logLengths.get(log).longValue());
+ }
+
+ // truncate it once again
+ logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
+ assertEquals("index file got modified", indexModificationTimeStamp,
+ logIndex.lastModified());
+
+ logLengths = getAllLogsFileLengths(attemptID, false);
+ for (LogName log : LogName.values()) {
+ File logFile = TaskLog.getTaskLogFile(attemptID, log);
+ assertEquals(500, logFile.length());
+ // The index file should also be proper.
+ assertEquals(500, logLengths.get(log).longValue());
+ }
+ }
+
+ /**
+ * Test the disabling of truncation of log-file.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testDisabledLogTruncation() throws IOException {
+ // Anything less than 0 disables the truncation.
+ Configuration conf = setRetainSizes(-1L, -1L);
+ UserLogManager logManager = new UtilsForTests.InLineUserLogManager(conf);
+
+ TaskID baseId = new TaskID();
+ int taskcount = 0;
+
+ TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
+ Task task = new MapTask(null, attemptID, 0, new JobSplit.TaskSplitIndex(),
+ 0);
+
+ // Let the tasks write some logs
+ for (LogName log : LogName.values()) {
+ writeRealBytes(attemptID, attemptID, log, 1500, 'H');
+ }
+
+ File attemptDir = TaskLog.getAttemptDir(attemptID.toString());
+ assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
+
+ // Finish the task and the JVM too.
+ JVMInfo jvmInfo = new JVMInfo(attemptID, Arrays.asList(task));
+ logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
+
+ // The log-file should not be truncated.
+ assertTrue(attemptDir.exists());
+ Map<LogName, Long> logLengths = getAllLogsFileLengths(attemptID, false);
+ for (LogName log : LogName.values()) {
+ File logFile = TaskLog.getTaskLogFile(attemptID, log);
+ assertEquals(1500, logFile.length());
+ // The index file should also be proper.
+ assertEquals(1500, logLengths.get(log).longValue());
+ }
+ }
+
+ /**
+ * Test the truncation of log-file when JVMs are not reused.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testLogTruncationOnFinishing() throws IOException {
+ Configuration conf = setRetainSizes(1000L, 1000L);
+ UserLogManager logManager = new UtilsForTests.InLineUserLogManager(conf);
+
+ TaskID baseId = new TaskID();
+ int taskcount = 0;
+
+ TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
+ Task task = new MapTask(null, attemptID, 0, new JobSplit.TaskSplitIndex(),
+ 0);
+
+ // Let the tasks write logs more than retain-size
+ for (LogName log : LogName.values()) {
+ writeRealBytes(attemptID, attemptID, log, 1500, 'H');
+ }
+
+ File attemptDir = TaskLog.getAttemptDir(attemptID.toString());
+ assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
+
+ // Finish the task and the JVM too.
+ JVMInfo jvmInfo = new JVMInfo(attemptID, Arrays.asList(task));
+ logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
+
+ // The log-file should now be truncated.
+ assertTrue(attemptDir.exists());
+
+ Map<LogName, Long> logLengths = getAllLogsFileLengths(attemptID, false);
+ for (LogName log : LogName.values()) {
+ File logFile = TaskLog.getTaskLogFile(attemptID, log);
+ assertEquals(1000, logFile.length());
+ // The index file should also be proper.
+ assertEquals(1000, logLengths.get(log).longValue());
+ }
+
+ // truncate once again
+ logLengths = getAllLogsFileLengths(attemptID, false);
+ for (LogName log : LogName.values()) {
+ File logFile = TaskLog.getTaskLogFile(attemptID, log);
+ assertEquals(1000, logFile.length());
+ // The index file should also be proper.
+ assertEquals(1000, logLengths.get(log).longValue());
+ }
+ }
+
+ /**
+ * Test the truncation of log-file.
+ *
+ * It writes two log files and truncates one, does not truncate other.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testLogTruncation() throws IOException {
+ Configuration conf = setRetainSizes(1000L, 1000L);
+ UserLogManager logManager = new UtilsForTests.InLineUserLogManager(conf);
+
+ TaskID baseId = new TaskID();
+ int taskcount = 0;
+
+ TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
+ Task task = new MapTask(null, attemptID, 0, new JobSplit.TaskSplitIndex(),
+ 0);
+
+ // Let the tasks write logs more than retain-size
+ writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 1500, 'H');
+ writeRealBytes(attemptID, attemptID, LogName.STDERR, 500, 'H');
+
+ File attemptDir = TaskLog.getAttemptDir(attemptID.toString());
+ assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
+
+ // Finish the task and the JVM too.
+ JVMInfo jvmInfo = new JVMInfo(attemptID, Arrays.asList(task));
+ logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
+
+ // The log-file should now be truncated.
+ assertTrue(attemptDir.exists());
+
+ Map<LogName, Long> logLengths = getAllLogsFileLengths(attemptID, false);
+ File logFile = TaskLog.getTaskLogFile(attemptID, LogName.SYSLOG);
+ assertEquals(1000, logFile.length());
+ // The index file should also be proper.
+ assertEquals(1000, logLengths.get(LogName.SYSLOG).longValue());
+ logFile = TaskLog.getTaskLogFile(attemptID, LogName.STDERR);
+ assertEquals(500, logFile.length());
+ // The index file should also be proper.
+ assertEquals(500, logLengths.get(LogName.STDERR).longValue());
+
+ // truncate once again
+ logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
+ logLengths = getAllLogsFileLengths(attemptID, false);
+ logFile = TaskLog.getTaskLogFile(attemptID, LogName.SYSLOG);
+ assertEquals(1000, logFile.length());
+ // The index file should also be proper.
+ assertEquals(1000, logLengths.get(LogName.SYSLOG).longValue());
+ logFile = TaskLog.getTaskLogFile(attemptID, LogName.STDERR);
+ assertEquals(500, logFile.length());
+ // The index file should also be proper.
+ assertEquals(500, logLengths.get(LogName.STDERR).longValue());
+ }
+
+ /**
+ * Test the truncation of log-file when JVM-reuse is enabled.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testLogTruncationOnFinishingWithJVMReuse() throws IOException {
+ Configuration conf = setRetainSizes(150L, 150L);
+ UserLogManager logManager = new UtilsForTests.InLineUserLogManager(conf);
+
+ TaskID baseTaskID = new TaskID();
+ int attemptsCount = 0;
+
+ // Assuming the job's retain size is 150
+ TaskAttemptID attempt1 = new TaskAttemptID(baseTaskID, attemptsCount++);
+ Task task1 = new MapTask(null, attempt1, 0, new JobSplit.TaskSplitIndex(),
+ 0);
+
+ // Let the tasks write logs more than retain-size
+ writeRealBytes(attempt1, attempt1, LogName.SYSLOG, 200, 'A');
+
+ File attemptDir = TaskLog.getAttemptDir(attempt1.toString());
+ assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
+
+ // Start another attempt in the same JVM
+ TaskAttemptID attempt2 = new TaskAttemptID(baseTaskID, attemptsCount++);
+ Task task2 = new MapTask(null, attempt2, 0, new JobSplit.TaskSplitIndex(),
+ 0);
+ // Let attempt2 also write some logs
+ writeRealBytes(attempt1, attempt2, LogName.SYSLOG, 100, 'B');
+ // Start yet another attempt in the same JVM
+ TaskAttemptID attempt3 = new TaskAttemptID(baseTaskID, attemptsCount++);
+ Task task3 = new MapTask(null, attempt3, 0, new JobSplit.TaskSplitIndex(),
+ 0);
+ // Let attempt3 also write some logs
+ writeRealBytes(attempt1, attempt3, LogName.SYSLOG, 225, 'C');
+ // Finish the JVM.
+ JVMInfo jvmInfo = new JVMInfo(attempt1, Arrays.asList((new Task[] { task1,
+ task2, task3 })));
+ logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
+
+ // The log-file should now be truncated.
+ assertTrue(attemptDir.exists());
+ File logFile = TaskLog.getTaskLogFile(attempt1, LogName.SYSLOG);
+ assertEquals(400, logFile.length());
+ // The index files should also be proper.
+ assertEquals(150, getAllLogsFileLengths(attempt1, false).get(
+ LogName.SYSLOG).longValue());
+ assertEquals(100, getAllLogsFileLengths(attempt2, false).get(
+ LogName.SYSLOG).longValue());
+ assertEquals(150, getAllLogsFileLengths(attempt3, false).get(
+ LogName.SYSLOG).longValue());
+
+ // assert the data.
+ FileReader reader =
+ new FileReader(TaskLog.getTaskLogFile(attempt1, LogName.SYSLOG));
+ int ch, bytesRead = 0;
+ boolean dataValid = true;
+ while ((ch = reader.read()) != -1) {
+ bytesRead++;
+ if (bytesRead <= 150) {
+ if ((char) ch != 'A') {
+ LOG.warn("Truncation didn't happen properly. At "
+ + (bytesRead + 1) + "th byte, expected 'A' but found "
+ + (char) ch);
+ dataValid = false;
+ }
+ } else if (bytesRead <= 250) {
+ if ((char) ch != 'B') {
+ LOG.warn("Truncation didn't happen properly. At "
+ + (bytesRead + 1) + "th byte, expected 'B' but found "
+ + (char) ch);
+ dataValid = false;
+ }
+ } else if ((char) ch != 'C') {
+ LOG.warn("Truncation didn't happen properly. At " + (bytesRead + 1)
+ + "th byte, expected 'C' but found " + (char) ch);
+ dataValid = false;
+ }
+ }
+ assertTrue("Log-truncation didn't happen properly!", dataValid);
+
+ logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
+ assertEquals(400, logFile.length());
+ }
+
+ private static String TEST_ROOT_DIR =
+ new File(System.getProperty("test.build.data", "/tmp")).toURI().toString().replace(
+ ' ', '+');
+
+ private static String STDERR_LOG = "stderr log";
+ public static class LoggingMapper<K, V> extends IdentityMapper<K, V> {
+
+ public void map(K key, V val, OutputCollector<K, V> output,
+ Reporter reporter) throws IOException {
+ // Write lots of logs
+ for (int i = 0; i < 1000; i++) {
+ System.out.println("Lots of logs! Lots of logs! "
+ + "Waiting to be truncated! Lots of logs!");
+ }
+ // write some log into stderr
+ System.err.println(STDERR_LOG);
+ super.map(key, val, output, reporter);
+ }
+ }
+
+ /**
+ * Test logs monitoring with {@link MiniMRCluster}
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testLogsMonitoringWithMiniMR() throws IOException {
+
+ MiniMRCluster mr = null;
+ try {
+ JobConf clusterConf = new JobConf();
+ clusterConf.setLong(TaskLogsTruncater.MAP_USERLOG_RETAIN_SIZE, 10000L);
+ clusterConf.setLong(TaskLogsTruncater.REDUCE_USERLOG_RETAIN_SIZE, 10000L);
+ mr = new MiniMRCluster(1, "file:///", 3, null, null, clusterConf);
+
+ JobConf conf = mr.createJobConf();
+
+ Path inDir = new Path(TEST_ROOT_DIR + "/input");
+ Path outDir = new Path(TEST_ROOT_DIR + "/output");
+ FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(outDir)) {
+ fs.delete(outDir, true);
+ }
+ if (!fs.exists(inDir)) {
+ fs.mkdirs(inDir);
+ }
+ String input = "The quick brown fox jumped over the lazy dog";
+ DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+ file.writeBytes(input);
+ file.close();
+
+ conf.setInputFormat(TextInputFormat.class);
+ conf.setOutputKeyClass(LongWritable.class);
+ conf.setOutputValueClass(Text.class);
+
+ FileInputFormat.setInputPaths(conf, inDir);
+ FileOutputFormat.setOutputPath(conf, outDir);
+ conf.setNumMapTasks(1);
+ conf.setNumReduceTasks(0);
+ conf.setMapperClass(LoggingMapper.class);
+
+ RunningJob job = JobClient.runJob(conf);
+ assertTrue(job.getJobState() == JobStatus.SUCCEEDED);
+ for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0)) {
+ long length =
+ TaskLog.getTaskLogFile(tce.getTaskAttemptId(),
+ TaskLog.LogName.STDOUT).length();
+ assertTrue("STDOUT log file length for " + tce.getTaskAttemptId()
+ + " is " + length + " and not <=10000", length <= 10000);
+ if (tce.isMap) {
+ String stderr = TestMiniMRMapRedDebugScript.readTaskLog(
+ LogName.STDERR, tce.getTaskAttemptId(), false);
+ System.out.println("STDERR log:" + stderr);
+ assertTrue(stderr.length() > 0);
+ assertTrue(stderr.length() < 10000);
+ assertTrue(stderr.equals(STDERR_LOG));
+ }
+ }
+ } finally {
+ if (mr != null) {
+ mr.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test the truncation of DEBUGOUT file by {@link TaskLogsTruncater}
+ * @throws IOException
+ */
+ @Test
+ public void testDebugLogsTruncationWithMiniMR() throws IOException {
+
+ MiniMRCluster mr = null;
+ try {
+ JobConf clusterConf = new JobConf();
+ clusterConf.setLong(TaskLogsTruncater.MAP_USERLOG_RETAIN_SIZE, 10000L);
+ clusterConf.setLong(TaskLogsTruncater.REDUCE_USERLOG_RETAIN_SIZE, 10000L);
+ mr = new MiniMRCluster(1, "file:///", 3, null, null, clusterConf);
+
+ JobConf conf = mr.createJobConf();
+
+ Path inDir = new Path(TEST_ROOT_DIR + "/input");
+ Path outDir = new Path(TEST_ROOT_DIR + "/output");
+ FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(outDir)) {
+ fs.delete(outDir, true);
+ }
+ if (!fs.exists(inDir)) {
+ fs.mkdirs(inDir);
+ }
+ String input = "The quick brown fox jumped over the lazy dog";
+ DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+ file.writeBytes(input);
+ file.close();
+
+ conf.setInputFormat(TextInputFormat.class);
+ conf.setOutputKeyClass(LongWritable.class);
+ conf.setOutputValueClass(Text.class);
+
+ FileInputFormat.setInputPaths(conf, inDir);
+ FileOutputFormat.setOutputPath(conf, outDir);
+ conf.setNumMapTasks(1);
+ conf.setMaxMapAttempts(1);
+ conf.setNumReduceTasks(0);
+ conf.setMapperClass(TestMiniMRMapRedDebugScript.MapClass.class);
+
+ // copy debug script to cache from local file system.
+ Path scriptPath = new Path(TEST_ROOT_DIR, "debug-script.txt");
+ String debugScriptContent =
+ "for ((i=0;i<1000;i++)); " + "do "
+ + "echo \"Lots of logs! Lots of logs! "
+ + "Waiting to be truncated! Lots of logs!\";" + "done";
+ DataOutputStream scriptFile = fs.create(scriptPath);
+ scriptFile.writeBytes(debugScriptContent);
+ scriptFile.close();
+ new File(scriptPath.toUri().getPath()).setExecutable(true);
+
+ URI uri = scriptPath.toUri();
+ DistributedCache.createSymlink(conf);
+ DistributedCache.addCacheFile(uri, conf);
+ conf.setMapDebugScript(scriptPath.toUri().getPath());
+
+ RunningJob job = null;
+ try {
+ JobClient jc = new JobClient(conf);
+ job = jc.submitJob(conf);
+ try {
+ jc.monitorAndPrintJob(conf, job);
+ } catch (InterruptedException e) {
+ //
+ }
+ } catch (IOException ioe) {
+ } finally{
+ for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0)) {
+ File debugOutFile =
+ TaskLog.getTaskLogFile(tce.getTaskAttemptId(),
+ TaskLog.LogName.DEBUGOUT);
+ if (debugOutFile.exists()) {
+ long length = debugOutFile.length();
+ assertTrue("DEBUGOUT log file length for "
+ + tce.getTaskAttemptId() + " is " + length
+ + " and not =10000", length == 10000);
+ }
+ }
+ }
+ } finally {
+ if (mr != null) {
+ mr.shutdown();
+ }
+ }
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Mar 4 04:03:23 2011
@@ -38,12 +38,11 @@ import org.apache.hadoop.mapred.TaskCont
import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Shell;
/**
@@ -75,6 +74,7 @@ public class TestTaskTrackerLocalization
protected Path attemptWorkDir;
protected File[] attemptLogFiles;
protected JobConf localizedTaskConf;
+ private TaskInProgress tip;
/**
* Dummy method in this base class. Only derived classes will define this
@@ -113,8 +113,14 @@ public class TestTaskTrackerLocalization
trackerFConf.setStrings("mapred.local.dir", localDirs);
// Create the job configuration file. Same as trackerConf in this test.
- JobConf jobConf = trackerFConf;
-
+ JobConf jobConf = new JobConf(trackerFConf);
+ // Set job view ACLs in conf sothat validation of contents of jobACLsFile
+ // can be done against this value. Have both users and groups
+ String jobViewACLs = "user1,user2, group1,group2";
+ jobConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs);
+ jobConf.setInt("mapred.userlog.retain.hours", 0);
+ String jtIdentifier = "200907202331";
+ jobId = new JobID(jtIdentifier, 1);
// JobClient uploads the job jar to the file system and sets it in the
// jobConf.
@@ -123,9 +129,16 @@ public class TestTaskTrackerLocalization
// JobClient uploads the jobConf to the file system.
File jobConfFile = uploadJobConf(jobConf);
+ // create jobTokens file
+ uploadJobTokensFile();
+
// Set up the TaskTracker
tracker = new TaskTracker();
tracker.setConf(trackerFConf);
+ tracker.setIndexCache(new IndexCache(trackerFConf));
+ tracker.setUserLogManager(new UtilsForTests.InLineUserLogManager(
+ trackerFConf));
+ tracker.setTaskMemoryManagerEnabledFlag();
// for test case system FS is the local FS
@@ -136,13 +149,6 @@ public class TestTaskTrackerLocalization
taskTrackerUGI = UserGroupInformation.getCurrentUser();
// Set up the task to be localized
- String jtIdentifier = "200907202331";
- jobId = new JobID(jtIdentifier, 1);
-
- TaskTracker.RunningJob rjob = new TaskTracker.RunningJob(jobId);
- rjob.ugi = UserGroupInformation.getCurrentUser();
- tracker.runningJobs.put(jobId, rjob);
-
taskId =
new TaskAttemptID(jtIdentifier, jobId.getId(), true, 1, 0);
task =
@@ -150,8 +156,6 @@ public class TestTaskTrackerLocalization
task.setConf(jobConf); // Set conf. Set user name in particular.
task.setUser(UserGroupInformation.getCurrentUser().getUserName());
- // create jobTokens file
- uploadJobTokensFile();
taskController = new DefaultTaskController();
taskController.setConf(trackerFConf);
@@ -160,6 +164,10 @@ public class TestTaskTrackerLocalization
tracker.setTaskController(taskController);
tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
taskController));
+
+ // mimic register task
+ // create the tip
+ tip = tracker.new TaskInProgress(task, trackerFConf);
}
/**
@@ -356,24 +364,8 @@ public class TestTaskTrackerLocalization
if (!canRun()) {
return;
}
- tracker.getLocalizer().initializeUserDirs(task.getUser());
-
- // /////////// The main method being tested
- localizedJobConf = tracker.localizeJobFiles(task,
- new TaskTracker.RunningJob(task.getJobID()));
- // ///////////
-
- // Now initialize the job via task-controller so as to set
- // ownership/permissions of jars, job-work-dir
- JobInitializationContext context = new JobInitializationContext();
- context.jobid = jobId;
- context.user = task.getUser();
- context.workDir =
- new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
-
- // /////////// The method being tested
- taskController.initializeJob(context);
- // ///////////
+ TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+ localizedJobConf = rjob.getJobConf();
checkJobLocalization();
}
@@ -441,6 +433,13 @@ public class TestTaskTrackerLocalization
assertTrue(
"mapred.jar is not set properly to the target users directory : "
+ localizedJobJar, mapredJarFlag);
+
+ // check job user-log directory permissions
+ File jobLogDir = TaskLog.getJobDir(jobId);
+ assertTrue("job log directory " + jobLogDir + " does not exist!", jobLogDir
+ .exists());
+ checkFilePermissions(jobLogDir.toString(), "drwx------", task.getUser(),
+ taskTrackerUGI.getGroupNames()[0]);
}
/**
@@ -453,25 +452,9 @@ public class TestTaskTrackerLocalization
if (!canRun()) {
return;
}
- tracker.getLocalizer().initializeUserDirs(task.getUser());
- localizedJobConf = tracker.localizeJobFiles(task,
- new TaskTracker.RunningJob(task.getJobID()));
-
- // Set job view ACLs in conf sothat validation of contents of jobACLsFile
- // can be done against this value. Have both users and groups
- String jobViewACLs = "user1,user2, group1,group2";
- localizedJobConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs);
-
- // Now initialize the job via task-controller so as to set
- // ownership/permissions of jars, job-work-dir
- JobInitializationContext jobContext = new JobInitializationContext();
- jobContext.jobid = jobId;
- jobContext.user = task.getUser();
- jobContext.workDir =
- new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
- taskController.initializeJob(jobContext);
+ TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+ localizedJobConf = rjob.getJobConf();
- TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
tip.setJobConf(localizedJobConf);
// ////////// The central method being tested
@@ -552,9 +535,7 @@ public class TestTaskTrackerLocalization
.getPath(), "tmp").exists());
// Make sure that the logs are setup properly
- File logDir =
- new File(HADOOP_LOG_DIR, TaskLog.USERLOGS_DIR_NAME + Path.SEPARATOR
- + task.getTaskID().toString());
+ File logDir = TaskLog.getAttemptDir(taskId.toString());
assertTrue("task's log dir " + logDir.toString() + " doesn't exist!",
logDir.exists());
checkFilePermissions(logDir.getAbsolutePath(), "drwx------", task
@@ -709,21 +690,12 @@ public class TestTaskTrackerLocalization
private void testTaskCleanup(boolean needCleanup, boolean jvmReuse)
throws Exception {
// Localize job and localize task.
- tracker.getLocalizer().initializeUserDirs(task.getUser());
- localizedJobConf = tracker.localizeJobFiles(task,
- new TaskTracker.RunningJob(task.getJobID()));
+ TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
+ TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+ localizedJobConf = rjob.getJobConf();
if (jvmReuse) {
localizedJobConf.setNumTasksToExecutePerJvm(2);
}
- // Now initialize the job via task-controller so as to set
- // ownership/permissions of jars, job-work-dir
- JobInitializationContext jobContext = new JobInitializationContext();
- jobContext.jobid = jobId;
- jobContext.user = localizedJobConf.getUser();
- jobContext.workDir =
- new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
- taskController.initializeJob(jobContext);
- TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
tip.setJobConf(localizedJobConf);
tip.localizeTask(task);
Path workDir =
@@ -777,22 +749,17 @@ public class TestTaskTrackerLocalization
*/
private void verifyUserLogsCleanup()
throws IOException {
- Path logDir =
- new Path(HADOOP_LOG_DIR.toURI().getPath(), TaskLog.USERLOGS_DIR_NAME
- + Path.SEPARATOR + task.getTaskID().toString());
-
+ // verify user logs cleanup
+ File jobUserLogDir = TaskLog.getJobDir(jobId);
// Logs should be there before cleanup.
- assertTrue("Userlogs dir " + logDir + " is not presen as expected!!",
- tracker.getLocalFileSystem().exists(logDir));
-
- // ////////// Another being tested
- TaskLog.cleanup(-1); // -1 so as to move purgeTimeStamp to future and file
- // modification time behind retainTimeStatmp
- // //////////
-
+ assertTrue("Userlogs dir " + jobUserLogDir + " is not present as expected!!",
+ jobUserLogDir.exists());
+ tracker.purgeJob(new KillJobAction(jobId));
+ tracker.getUserLogManager().getUserLogCleaner().processCompletedJobs();
+
// Logs should be gone after cleanup.
- assertFalse("Userlogs dir " + logDir + " is not deleted as expected!!",
- tracker.getLocalFileSystem().exists(logDir));
+ assertFalse("Userlogs dir " + jobUserLogDir + " is not deleted as expected!!",
+ jobUserLogDir.exists());
}
/**
@@ -806,24 +773,12 @@ public class TestTaskTrackerLocalization
}
LOG.info("Running testJobCleanup()");
- // Localize job and localize task.
- tracker.getLocalizer().initializeUserDirs(task.getUser());
- localizedJobConf =
- tracker.localizeJobFiles(task,
- new TaskTracker.RunningJob(task.getJobID()));
-
- // Now initialize the job via task-controller so as to set
- // ownership/permissions of job-work-dir
- JobInitializationContext jobContext = new JobInitializationContext();
- jobContext.jobid = jobId;
- jobContext.user = localizedJobConf.getUser();
- jobContext.workDir =
- new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
- taskController.initializeJob(jobContext);
-
// Set an inline cleanup queue
InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
tracker.setCleanupThread(cleanupQueue);
+ // Localize job and localize task.
+ TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+ localizedJobConf = rjob.getJobConf();
// Create a file in job's work-dir with 555
String jobWorkDir =
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java Fri Mar 4 04:03:23 2011
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Test;
+
+public class TestUserLogCleanup {
+ private static String jtid = "test";
+ private static long ONE_HOUR = 1000 * 60 * 60;
+ private Localizer localizer;
+ private UserLogManager userLogManager;
+ private UserLogCleaner userLogCleaner;
+ private TaskTracker tt;
+ private FakeClock myClock;
+ private JobID jobid1 = new JobID(jtid, 1);
+ private JobID jobid2 = new JobID(jtid, 2);
+ private JobID jobid3 = new JobID(jtid, 3);
+ private JobID jobid4 = new JobID(jtid, 4);
+ private File foo = new File(TaskLog.getUserLogDir(), "foo");
+ private File bar = new File(TaskLog.getUserLogDir(), "bar");
+
+ public TestUserLogCleanup() throws IOException {
+ Configuration conf = new Configuration();
+ startTT(conf);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ FileUtil.fullyDelete(TaskLog.getUserLogDir());
+ }
+
+ private File localizeJob(JobID jobid) throws IOException {
+ File jobUserlog = TaskLog.getJobDir(jobid);
+ // localize job log directory
+ tt.initializeJobLogDir(jobid);
+ assertTrue(jobUserlog + " directory is not created.", jobUserlog.exists());
+ return jobUserlog;
+ }
+
+ private void jobFinished(JobID jobid, int logRetainHours) {
+ JobCompletedEvent jce = new JobCompletedEvent(jobid, myClock.getTime(),
+ logRetainHours);
+ userLogManager.addLogEvent(jce);
+ }
+
+ private void startTT(Configuration conf) throws IOException {
+ myClock = new FakeClock(); // clock is reset.
+ tt = new TaskTracker();
+ localizer = new Localizer(FileSystem.get(conf), conf
+ .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY),
+ new DefaultTaskController());
+ tt.setLocalizer(localizer);
+ userLogManager = new UtilsForTests.InLineUserLogManager(conf);
+ userLogCleaner = userLogManager.getUserLogCleaner();
+ userLogCleaner.setClock(myClock);
+ tt.setUserLogManager(userLogManager);
+ userLogManager.clearOldUserLogs(conf);
+ }
+
+ private void ttReinited() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setInt(JobContext.USER_LOG_RETAIN_HOURS, 3);
+ userLogManager.clearOldUserLogs(conf);
+ }
+
+ private void ttRestarted() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setInt(JobContext.USER_LOG_RETAIN_HOURS, 3);
+ startTT(conf);
+ }
+
+ /**
+ * Tests job user-log directory deletion.
+ *
+ * Adds two jobs for log deletion. One with one hour retain hours, other with
+ * two retain hours. After an hour,
+ * TaskLogCleanupThread.processCompletedJobs() call, makes sure job with 1hr
+ * retain hours is removed and other is retained. After one more hour, job
+ * with 2hr retain hours is also removed.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testJobLogCleanup() throws IOException {
+ File jobUserlog1 = localizeJob(jobid1);
+ File jobUserlog2 = localizeJob(jobid2);
+
+ // add job user log directory for deletion, with 2 hours for deletion
+ jobFinished(jobid1, 2);
+
+ // add the job for deletion with one hour as retain hours
+ jobFinished(jobid2, 1);
+
+ // remove old logs and see jobid1 is not removed and jobid2 is removed
+ myClock.advance(ONE_HOUR);
+ userLogCleaner.processCompletedJobs();
+ assertTrue(jobUserlog1 + " got deleted", jobUserlog1.exists());
+ assertFalse(jobUserlog2 + " still exists.", jobUserlog2.exists());
+
+ myClock.advance(ONE_HOUR);
+ // remove old logs and see jobid1 is removed now
+ userLogCleaner.processCompletedJobs();
+ assertFalse(jobUserlog1 + " still exists.", jobUserlog1.exists());
+ }
+
+ /**
+ * Tests user-log directory cleanup on a TT re-init with 3 hours as log retain
+ * hours for tracker.
+ *
+ * Adds job1 deletion before the re-init with 2 hour retain hours. Adds job2
+ * for which there are no tasks/killJobAction after the re-init. Adds job3 for
+ * which there is localizeJob followed by killJobAction with 3 hours as retain
+ * hours. Adds job4 for which there are some tasks after the re-init.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testUserLogCleanup() throws IOException {
+ File jobUserlog1 = localizeJob(jobid1);
+ File jobUserlog2 = localizeJob(jobid2);
+ File jobUserlog3 = localizeJob(jobid3);
+ File jobUserlog4 = localizeJob(jobid4);
+ // create a some files/dirs in userlog
+ foo.mkdirs();
+ bar.createNewFile();
+
+ // add the jobid1 for deletion with retainhours = 2
+ jobFinished(jobid1, 2);
+
+ // time is now 1.
+ myClock.advance(ONE_HOUR);
+
+ // mimic TaskTracker reinit
+ // re-init the tt with 3 hours as user log retain hours.
+ // This re-init clears the user log directory
+ // job directories will be added with 3 hours as retain hours.
+ // i.e. They will be deleted at time 4.
+ ttReinited();
+
+ assertFalse(foo.exists());
+ assertFalse(bar.exists());
+ assertTrue(jobUserlog1.exists());
+ assertTrue(jobUserlog2.exists());
+ assertTrue(jobUserlog3.exists());
+ assertTrue(jobUserlog4.exists());
+
+ myClock.advance(ONE_HOUR);
+ // time is now 2.
+ userLogCleaner.processCompletedJobs();
+ assertFalse(jobUserlog1.exists());
+ assertTrue(jobUserlog2.exists());
+ assertTrue(jobUserlog3.exists());
+ assertTrue(jobUserlog4.exists());
+
+ // mimic localizeJob followed KillJobAction for jobid3
+ // add the job for deletion with retainhours = 3.
+ // jobid3 should be deleted at time 5.
+ jobUserlog3 = localizeJob(jobid3);
+ jobFinished(jobid3, 3);
+
+ // mimic localizeJob for jobid4
+ jobUserlog4 = localizeJob(jobid4);
+
+ // do cleanup
+ myClock.advance(2 * ONE_HOUR);
+ // time is now 4.
+ userLogCleaner.processCompletedJobs();
+
+ // jobid2 will be deleted
+ assertFalse(jobUserlog1.exists());
+ assertFalse(jobUserlog2.exists());
+ assertTrue(jobUserlog3.exists());
+ assertTrue(jobUserlog4.exists());
+
+ myClock.advance(ONE_HOUR);
+ // time is now 5.
+ // do cleanup again
+ userLogCleaner.processCompletedJobs();
+
+ // jobid3 will be deleted
+ assertFalse(jobUserlog1.exists());
+ assertFalse(jobUserlog2.exists());
+ assertFalse(jobUserlog3.exists());
+ assertTrue(jobUserlog4.exists());
+ }
+
+ /**
+ * Tests user-log directory cleanup on a TT restart.
+ *
+ * Adds job1 deletion before the restart with 2 hour retain hours. Adds job2
+ * for which there are no tasks/killJobAction after the restart. Adds job3 for
+ * which there is localizeJob followed by killJobAction after the restart with
+ * 3 hours retain hours. Adds job4 for which there are some tasks after the
+ * restart.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testUserLogCleanupAfterRestart() throws IOException {
+ File jobUserlog1 = localizeJob(jobid1);
+ File jobUserlog2 = localizeJob(jobid2);
+ File jobUserlog3 = localizeJob(jobid3);
+ File jobUserlog4 = localizeJob(jobid4);
+ // create a some files/dirs in userlog
+ foo.mkdirs();
+ bar.createNewFile();
+
+ // add the jobid1 for deletion with retain hours = 2
+ jobFinished(jobid1, 2);
+
+ // time is now 1.
+ myClock.advance(ONE_HOUR);
+
+ // Mimic the TaskTracker restart
+ // Restart the tt with 3 hours as user log retain hours.
+ // This restart clears the user log directory
+ // job directories will be added with 3 hours as retain hours.
+ // i.e. They will be deleted at time 3 as clock will reset after the restart
+ ttRestarted();
+
+ assertFalse(foo.exists());
+ assertFalse(bar.exists());
+ assertTrue(jobUserlog1.exists());
+ assertTrue(jobUserlog2.exists());
+ assertTrue(jobUserlog3.exists());
+ assertTrue(jobUserlog4.exists());
+
+ myClock.advance(ONE_HOUR);
+ // time is now 1.
+ userLogCleaner.processCompletedJobs();
+ assertTrue(jobUserlog1.exists());
+ assertTrue(jobUserlog2.exists());
+ assertTrue(jobUserlog3.exists());
+ assertTrue(jobUserlog4.exists());
+
+ // mimic localizeJob followed KillJobAction for jobid3
+ // add the job for deletion with retainhours = 3.
+ // jobid3 should be deleted at time 4.
+ jobUserlog3 = localizeJob(jobid3);
+ jobFinished(jobid3, 3);
+
+ // mimic localizeJob for jobid4
+ jobUserlog4 = localizeJob(jobid4);
+
+ // do cleanup
+ myClock.advance(2 * ONE_HOUR);
+ // time is now 3.
+ userLogCleaner.processCompletedJobs();
+
+ // jobid1 and jobid2 will be deleted
+ assertFalse(jobUserlog1.exists());
+ assertFalse(jobUserlog2.exists());
+ assertTrue(jobUserlog3.exists());
+ assertTrue(jobUserlog4.exists());
+
+ myClock.advance(ONE_HOUR);
+ // time is now 4.
+ // do cleanup again
+ userLogCleaner.processCompletedJobs();
+
+ // jobid3 will be deleted
+ assertFalse(jobUserlog1.exists());
+ assertFalse(jobUserlog2.exists());
+ assertFalse(jobUserlog3.exists());
+ assertTrue(jobUserlog4.exists());
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java Fri Mar 4 04:03:23 2011
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapred;
+import static org.junit.Assert.fail;
+
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.io.*;
@@ -47,6 +49,8 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.UserLogEvent;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.UserLogManager;
import org.apache.hadoop.util.StringUtils;
/**
@@ -719,6 +723,26 @@ public class UtilsForTests {
}
}
+ /**
+ * This is an in-line {@link UserLogManager} to do all the actions in-line.
+ */
+ static class InLineUserLogManager extends UserLogManager {
+ public InLineUserLogManager(Configuration conf) throws IOException {
+ super(conf);
+ getUserLogCleaner().setCleanupQueue(new InlineCleanupQueue());
+ }
+
+ // do the action in-line
+ public void addLogEvent(UserLogEvent event) {
+ try {
+ super.addLogEvent(event);
+ super.monitor();
+ } catch (Exception e) {
+ fail("failed to process action " + event.getEventType());
+ }
+ }
+ }
+
static void setUpConfigFile(Properties confProps, File configFile)
throws IOException {
Configuration config = new Configuration(false);