You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:39:22 UTC

svn commit: r1077640 - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/streaming/src/test/system/org/apache/hadoop/mapred/ test/system/java/org/apache/hadoop/mapred/

Author: omalley
Date: Fri Mar  4 04:39:22 2011
New Revision: 1077640

URL: http://svn.apache.org/viewvc?rev=1077640&view=rev
Log:
commit afba6e20c76fa75bb1f97e5fb4d26fc8de00364f
Author: Vinay Kumar Thota <vi...@yahoo-inc.com>
Date:   Thu Aug 5 04:58:36 2010 +0000

    3867536 Fix the instability MR system tests from

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestStreamingJobProcessTree.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestTaskKillingOfStreamingJob.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/HighRamJobHelper.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCacheFileReferenceCount.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobSummary.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestStreamingJobProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestStreamingJobProcessTree.java?rev=1077640&r1=1077639&r2=1077640&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestStreamingJobProcessTree.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestStreamingJobProcessTree.java Fri Mar  4 04:39:22 2011
@@ -1,297 +1,351 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapreduce.test.system.JTClient;
-import org.apache.hadoop.mapreduce.test.system.JobInfo;
-import org.apache.hadoop.mapreduce.test.system.MRCluster;
-import org.apache.hadoop.mapreduce.test.system.TaskInfo;
-import org.apache.hadoop.mapreduce.test.system.JTProtocol;
-import org.apache.hadoop.streaming.StreamJob;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.regex.Pattern;
-import java.util.regex.Matcher;
-
-/**
- * Increase memory usage beyond the memory limits of streaming job and 
- * verify whether task manager logs the process tree status 
- * before killing or not.
- */
-public class TestStreamingJobProcessTree {
-  private static final Log LOG = LogFactory
-     .getLog(TestStreamingJobProcessTree.class);
-  private static MRCluster cluster;
-  private static Configuration conf = new Configuration();
-  private static Path inputDir = new Path("input");
-  private static Path outputDir = new Path("output");
-  
-  @BeforeClass
-  public static void before() throws Exception {
-    String [] excludeExpList = {"java.net.ConnectException",
-      "java.io.IOException"};
-    cluster = MRCluster.createCluster(conf);
-    cluster.setExcludeExpList(excludeExpList);
-    cluster.setUp();
-    conf = cluster.getJTClient().getProxy().getDaemonConf();
-    createInput(inputDir, conf);
-  }
-  @AfterClass
-  public static void after() throws Exception {
-   cleanup(inputDir, conf);
-   cleanup(outputDir, conf);
-   cluster.tearDown();
-  }
-  
-  /**
-   * Increase the memory limit for map task and verify whether the 
-   * task manager logs the process tree status before killing or not.
-   * @throws IOException - If an I/O error occurs.
-   */
-  @Test
-  public void testStreamingJobProcTreeCleanOfMapTask() throws
-     IOException {
-    String runtimeArgs [] = {
-        "-D", "mapred.job.name=ProcTreeStreamJob",
-        "-D", "mapred.map.tasks=1",
-        "-D", "mapred.reduce.tasks=0",
-        "-D", "mapred.map.max.attempts=1",
-        "-D", "mapred.cluster.max.map.memory.mb=2",
-        "-D", "mapred.cluster.map.memory.mb=1",
-        "-D", "mapred.job.map.memory.mb=0.5"
-    };
-    
-    String [] otherArgs = new String[] {
-            "-input", inputDir.toString(),
-            "-output", outputDir.toString(),
-            "-mapper", "ProcessTree.sh",
-    };
-    JobID jobId = getJobId(runtimeArgs, otherArgs);
-    LOG.info("Job ID:" + jobId);
-    Assert.assertNotNull("Job ID not found for 1 min", jobId);
-    Assert.assertTrue("Job has not been started for 1 min.", 
-        cluster.getJTClient().isJobStarted(jobId));
-    TaskInfo taskInfo = getTaskInfo(jobId);
-    Assert.assertNotNull("TaskInfo is null",taskInfo);
-    Assert.assertTrue("Task has not been started for 1 min.", 
-        cluster.getJTClient().isTaskStarted(taskInfo));
-    JTProtocol wovenClient = cluster.getJTClient().getProxy();
-    int counter = 0;
-    while (counter++ < 60) {
-      if (taskInfo.getTaskStatus().length == 0) {
-        UtilsForTests.waitFor(1000);
-        taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
-      }else if (taskInfo.getTaskStatus()[0].getRunState() ==
-          TaskStatus.State.RUNNING) {
-        UtilsForTests.waitFor(1000);
-        taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
-      }
-    }
-
-    verifyProcessTreeOverLimit(taskInfo,jobId);
-  }
-  
-  /**
-   * Increase the memory limit for reduce task and verify whether the 
-   * task manager logs the process tree status before killing or not.
-   * @throws IOException - If an I/O error occurs.
-   */
-  @Test
-  public void testStreamingJobProcTreeCleanOfReduceTask() throws
-     IOException {
-    String runtimeArgs [] = {
-            "-D", "mapred.job.name=ProcTreeStreamJob",
-            "-D", "mapred.reduce.tasks=2",
-            "-D", "mapred.reduce.max.attempts=1",
-            "-D", "mapred.cluster.max.reduce.memory.mb=2",
-            "-D", "mapred.cluster.reduce.memory.mb=1",
-            "-D","mapred.job.reduce.memory.mb=0.5"};
-
-    String [] otherArgs = new String[] {
-            "-input", inputDir.toString(),
-            "-output", outputDir.toString(),
-            "-reducer", "ProcessTree.sh"
-    };
-
-    JobID jobId = getJobId(runtimeArgs, otherArgs);
-    Assert.assertNotNull("Job ID not found for 1 min", jobId);
-    Assert.assertTrue("Job has not been started for 1 min.", 
-        cluster.getJTClient().isJobStarted(jobId));
-    TaskInfo taskInfo = getTaskInfo(jobId);
-    Assert.assertNotNull("TaskInfo is null",taskInfo);
-    Assert.assertTrue("Task has not been started for 1 min.", 
-        cluster.getJTClient().isTaskStarted(taskInfo));    
-    JTProtocol wovenClient = cluster.getJTClient().getProxy();
-    int counter = 0;
-    while (counter++ < 60) {
-      if (taskInfo.getTaskStatus().length == 0) {
-        UtilsForTests.waitFor(1000);
-        taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
-      }else if (taskInfo.getTaskStatus()[0].getRunState() == 
-          TaskStatus.State.RUNNING) {
-        UtilsForTests.waitFor(1000);
-        taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
-      }
-    }
-    verifyProcessTreeOverLimit(taskInfo,jobId);
-  }
-  
-  private void verifyProcessTreeOverLimit(TaskInfo taskInfo, JobID jobId) 
-      throws IOException {
-    String taskOverLimitPatternString = 
-      "TaskTree \\[pid=[0-9]*,tipID=.*\\] is "
-      + "running beyond memory-limits. "
-      + "Current usage : [0-9]*bytes. Limit : %sbytes. Killing task.";
-    Pattern taskOverLimitPattern = 
-        Pattern.compile(String.format(taskOverLimitPatternString, 
-        String.valueOf(500 * 1024L)));
-    LOG.info("Task OverLimit Pattern:" + taskOverLimitPattern);
-    TaskID tID = TaskID.downgrade(taskInfo.getTaskID());
-    TaskAttemptID taskAttID = new TaskAttemptID(tID, 0);
-    JobClient jobClient = cluster.getJTClient().getClient();
-    RunningJob runJob = jobClient.getJob(jobId);
-    String[] taskDiagnostics = runJob.getTaskDiagnostics(taskAttID);
-    Assert.assertNotNull("Task diagnostics is null.", taskDiagnostics);
-    for (String strVal : taskDiagnostics) {
-      Matcher mat = taskOverLimitPattern.matcher(strVal);
-      Assert.assertTrue("Taskover limit error message is not matched.", 
-          mat.find());
-    }
-  }
-  
-  private String[] buildArgs(String [] runtimeArgs, String[] otherArgs) {
-    String shellFile = System.getProperty("user.dir") + 
-        "/src/test/system/scripts/ProcessTree.sh";
-    
-    String fileArgs[] = new String[] {"-files", shellFile };
-    int size = fileArgs.length + runtimeArgs.length + otherArgs.length;
-    String args[]= new String[size];
-    int index = 0;
-    for (String fileArg : fileArgs) {
-      args[index++] = fileArg;
-    }
-    for (String runtimeArg : runtimeArgs) {
-      args[index++] = runtimeArg;
-    }
-    for (String otherArg : otherArgs) {
-      args[index++] = otherArg;
-    }
-    return args;
-  }
-  
-  private JobID getJobId(String [] runtimeArgs, String [] otherArgs) 
-      throws IOException {
-    JobID jobId = null;
-    final RunStreamJob runSJ;
-    StreamJob streamJob = new StreamJob();
-    int counter = 0;
-    JTClient jtClient = cluster.getJTClient();
-    JobClient jobClient = jtClient.getClient();
-    int totalJobs = jobClient.getAllJobs().length;
-    String [] args = buildArgs(runtimeArgs, otherArgs);
-    cleanup(outputDir, conf);
-    runSJ = new RunStreamJob(conf, streamJob, args);
-    runSJ.start();
-    while (counter++ < 60) {
-      if (jobClient.getAllJobs().length - totalJobs == 0) {
-        UtilsForTests.waitFor(1000);
-      } else if (jobClient.getAllJobs()[0].getRunState() == JobStatus.RUNNING) {
-        jobId = jobClient.getAllJobs()[0].getJobID();
-        break;
-      } else {
-        UtilsForTests.waitFor(1000);
-      }
-    }  
-    return jobId;
-  }
-  
-  private TaskInfo getTaskInfo(JobID jobId) 
-      throws IOException {
-    JTProtocol wovenClient = cluster.getJTClient().getProxy();
-    JobInfo jInfo = wovenClient.getJobInfo(jobId);
-    JobStatus jobStatus = jInfo.getStatus();
-    // Make sure that map is running and start progress 10%. 
-    while (jobStatus.mapProgress() < 0.1f) {
-      UtilsForTests.waitFor(100);
-      jobStatus = wovenClient.getJobInfo(jobId).getStatus();
-    }
-    TaskInfo[] taskInfos = wovenClient.getTaskInfo(jobId);
-    for (TaskInfo taskinfo : taskInfos) {
-      if (!taskinfo.isSetupOrCleanup()) {
-        return taskinfo;
-      }
-    }
-    return null;
-  }
-
-  private static void createInput(Path inDir, Configuration conf) 
-     throws IOException {
-    FileSystem fs = inDir.getFileSystem(conf);
-    if (!fs.mkdirs(inDir)) {
-      throw new IOException("Failed to create the input directory:" 
-          + inDir.toString());
-    }
-    fs.setPermission(inDir, new FsPermission(FsAction.ALL, 
-    FsAction.ALL, FsAction.ALL));
-    DataOutputStream file = fs.create(new Path(inDir, "data.txt"));
-    String input="Process tree cleanup of Streaming job tasks.";
-    file.writeBytes(input + "\n");
-    file.close();
-  }
-
-  private static void cleanup(Path dir, Configuration conf) 
-      throws IOException {
-    FileSystem fs = dir.getFileSystem(conf);
-    fs.delete(dir, true);
-  }
-  
-  class RunStreamJob extends Thread {
-    Configuration jobConf;
-    Tool tool;
-    String [] args;
-    public RunStreamJob(Configuration jobConf, Tool tool, 
-        String [] args) {
-      this.jobConf = jobConf;
-      this.tool = tool;
-      this.args = args;
-    }
-    public void run() {
-      try {
-        ToolRunner.run(jobConf, tool, args);
-      } catch(InterruptedException iexp) {
-        LOG.warn("Thread is interrupted:" + iexp.getMessage());
-      } catch(Exception exp) {
-        LOG.warn("Exception:" + exp.getMessage());
-      }
-    }
-  }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.test.system.JTClient;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.streaming.StreamJob;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
+/**
+ * Increase memory usage beyond the memory limits of streaming job and 
+ * verify whether task manager logs the process tree status 
+ * before killing or not.
+ */
+public class TestStreamingJobProcessTree {
+  private static final Log LOG = LogFactory
+     .getLog(TestStreamingJobProcessTree.class);
+  private static MRCluster cluster;
+  private static Configuration conf = new Configuration();
+  private static Path inputDir = new Path("input");
+  private static Path outputDir = new Path("output");
+  
+  @BeforeClass
+  public static void before() throws Exception {
+    String [] excludeExpList = {"java.net.ConnectException",
+      "java.io.IOException"};
+    cluster = MRCluster.createCluster(conf);
+    cluster.setExcludeExpList(excludeExpList);
+    cluster.setUp();
+    conf = cluster.getJTClient().getProxy().getDaemonConf();
+    createInput(inputDir, conf);
+  }
+  @AfterClass
+  public static void after() throws Exception {
+   cleanup(inputDir, conf);
+   cleanup(outputDir, conf);
+   cluster.tearDown();
+  }
+  
+  /**
+   * Increase the memory limit for map task and verify whether the 
+   * task manager logs the process tree status before killing or not.
+   * @throws IOException - If an I/O error occurs.
+   */
+  @Test
+  public void testStreamingJobProcTreeCleanOfMapTask() throws
+     IOException {
+    String runtimeArgs [] = {
+        "-D", "mapred.job.name=ProcTreeStreamJob",
+        "-D", "mapred.map.tasks=1",
+        "-D", "mapred.reduce.tasks=0",
+        "-D", "mapred.map.max.attempts=1",
+        "-D", "mapred.cluster.max.map.memory.mb=2048",
+        "-D", "mapred.cluster.reduce.memory.mb=1024",
+        "-D", "mapred.cluster.max.reduce.memory.mb=2048",
+        "-D", "mapred.cluster.map.memory.mb=1024",
+        "-D", "mapred.job.map.memory.mb=512"
+    };
+    
+    String [] otherArgs = new String[] {
+            "-input", inputDir.toString(),
+            "-output", outputDir.toString(),
+            "-mapper", "ProcessTree.sh",
+    };
+    JobID jobId = getJobId(runtimeArgs, otherArgs);
+    LOG.info("Job ID:" + jobId);
+    if (jobId == null) {
+      jobId = getJobId(runtimeArgs, otherArgs);
+    }
+    Assert.assertNotNull("Job ID not found for 1 min", jobId);
+    Assert.assertTrue("Job has not been started for 1 min.", 
+        cluster.getJTClient().isJobStarted(jobId));
+    TaskInfo taskInfo = getTaskInfo(jobId, true);
+    Assert.assertNotNull("TaskInfo is null",taskInfo);
+    Assert.assertTrue("Task has not been started for 1 min.", 
+        cluster.getJTClient().isTaskStarted(taskInfo));
+    JTProtocol wovenClient = cluster.getJTClient().getProxy();
+    int counter = 0;
+    TaskInfo tempTaskInfo;
+    while (counter++ < 60) {
+      if (taskInfo.getTaskStatus().length == 0) {
+        UtilsForTests.waitFor(1000);
+        tempTaskInfo = taskInfo;
+        taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
+      }else if (taskInfo.getTaskStatus()[0].getRunState() ==
+          TaskStatus.State.RUNNING) {
+        UtilsForTests.waitFor(1000);
+        tempTaskInfo = taskInfo;
+        taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
+      } else {
+        break;
+      }
+
+      if (taskInfo == null) {
+        taskInfo = tempTaskInfo;
+        break;
+      }
+    }
+
+    verifyProcessTreeOverLimit(taskInfo,jobId);
+    JobInfo jInfo = wovenClient.getJobInfo(jobId);
+    LOG.info("Waiting till the job is completed...");
+    counter = 0;
+    while (counter++ < 60) {
+      if (jInfo == null) {
+        break;
+      } else if (jInfo.getStatus().isJobComplete()) {
+        break;
+      }
+      UtilsForTests.waitFor(100);
+      jInfo = wovenClient.getJobInfo(jobId);
+    }
+    UtilsForTests.waitFor(1000);
+  }
+  
+  /**
+   * Increase the memory limit for reduce task and verify whether the 
+   * task manager logs the process tree status before killing or not.
+   * @throws IOException - If an I/O error occurs.
+   */
+  @Test
+  public void testStreamingJobProcTreeCleanOfReduceTask() throws
+     IOException {
+    String runtimeArgs [] = {
+            "-D", "mapred.job.name=ProcTreeStreamJob",
+            "-D", "mapred.reduce.tasks=1",
+            "-D", "mapred.map.tasks=1",
+            "-D", "mapred.reduce.max.attempts=1",
+            "-D", "mapred.cluster.max.map.memory.mb=2048",
+            "-D", "mapred.cluster.map.memory.mb=1024",
+            "-D", "mapred.cluster.max.reduce.memory.mb=20248",
+            "-D", "mapred.cluster.reduce.memory.mb=1024",
+            "-D", "mapred.job.reduce.memory.mb=512"};
+
+    String [] otherArgs = new String[] {
+            "-input", inputDir.toString(),
+            "-output", outputDir.toString(),
+            "-mapper", "/bin/cat",
+            "-reducer", "ProcessTree.sh"
+    };
+
+    cleanup(outputDir, conf);
+    JobID jobId = getJobId(runtimeArgs, otherArgs);
+    if (jobId == null) {
+      jobId = getJobId(runtimeArgs, otherArgs);
+    }
+    Assert.assertNotNull("Job ID not found for 1 min", jobId);
+    Assert.assertTrue("Job has not been started for 1 min.", 
+        cluster.getJTClient().isJobStarted(jobId));
+    TaskInfo taskInfo = getTaskInfo(jobId, false);
+    Assert.assertNotNull("TaskInfo is null",taskInfo);
+    Assert.assertTrue("Task has not been started for 1 min.", 
+        cluster.getJTClient().isTaskStarted(taskInfo));    
+    JTProtocol wovenClient = cluster.getJTClient().getProxy();
+    int counter = 0;
+    TaskInfo tempTaskInfo;
+    while (counter++ < 60) {
+      if (taskInfo.getTaskStatus().length == 0) {
+        UtilsForTests.waitFor(1000);
+        tempTaskInfo = taskInfo;
+        taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
+      }else if (taskInfo.getTaskStatus()[0].getRunState() == 
+          TaskStatus.State.RUNNING) {
+        UtilsForTests.waitFor(1000);
+        tempTaskInfo = taskInfo;
+        taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
+      } else {
+        break;
+      }
+      if (taskInfo == null) {
+        taskInfo = tempTaskInfo;
+        break;
+      }
+    }
+    verifyProcessTreeOverLimit(taskInfo,jobId);
+    JobInfo jInfo = wovenClient.getJobInfo(jobId); 
+    LOG.info("Waiting till the job is completed...");
+    counter = 0;
+    while (counter++ < 60) {
+      if(jInfo == null) {
+        break;
+      } else if (jInfo.getStatus().isJobComplete()) {
+        break;
+      }
+      UtilsForTests.waitFor(1000);
+      jInfo = wovenClient.getJobInfo(jobId);
+    }
+  }
+  
+  private void verifyProcessTreeOverLimit(TaskInfo taskInfo, JobID jobId) 
+      throws IOException {
+    String taskOverLimitPatternString = 
+      "TaskTree \\[pid=[0-9]*,tipID=.*\\] is "
+      + "running beyond memory-limits. "
+      + "Current usage : [0-9]*bytes. Limit : %sbytes. Killing task.";
+    Pattern taskOverLimitPattern = 
+        Pattern.compile(String.format(taskOverLimitPatternString, 
+        String.valueOf(512 * 1024 * 1024L)));
+    LOG.info("Task OverLimit Pattern:" + taskOverLimitPattern);
+    TaskID tID = TaskID.downgrade(taskInfo.getTaskID());
+    TaskAttemptID taskAttID = new TaskAttemptID(tID, 0);
+    JobClient jobClient = cluster.getJTClient().getClient();
+    RunningJob runJob = jobClient.getJob(jobId);
+    String[] taskDiagnostics = runJob.getTaskDiagnostics(taskAttID);
+    Assert.assertNotNull("Task diagnostics is null.", taskDiagnostics);
+    for (String strVal : taskDiagnostics) {
+      Matcher mat = taskOverLimitPattern.matcher(strVal);
+      Assert.assertTrue("Taskover limit error message is not matched.", 
+          mat.find());
+    }
+  }
+  
+  private String[] buildArgs(String [] runtimeArgs, String[] otherArgs) {
+    String shellFile = System.getProperty("user.dir") + 
+        "/src/test/system/scripts/ProcessTree.sh";
+    
+    String fileArgs[] = new String[] {"-files", shellFile };
+    int size = fileArgs.length + runtimeArgs.length + otherArgs.length;
+    String args[]= new String[size];
+    int index = 0;
+    for (String fileArg : fileArgs) {
+      args[index++] = fileArg;
+    }
+    for (String runtimeArg : runtimeArgs) {
+      args[index++] = runtimeArg;
+    }
+    for (String otherArg : otherArgs) {
+      args[index++] = otherArg;
+    }
+    return args;
+  }
+  
+  private JobID getJobId(String [] runtimeArgs, String [] otherArgs) 
+      throws IOException {
+    JobID jobId = null;
+    final RunStreamJob runSJ;
+    StreamJob streamJob = new StreamJob();
+    int counter = 0;
+    JTClient jtClient = cluster.getJTClient();
+    JobClient jobClient = jtClient.getClient();
+    int totalJobs = jobClient.getAllJobs().length;
+    String [] args = buildArgs(runtimeArgs, otherArgs);
+    cleanup(outputDir, conf);
+    conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
+    runSJ = new RunStreamJob(conf, streamJob, args);
+    runSJ.start();
+    while (counter++ < 60) {
+      if (jobClient.getAllJobs().length - totalJobs == 0) {
+        UtilsForTests.waitFor(1000);
+      } else if (jobClient.getAllJobs()[0].getRunState() == JobStatus.RUNNING) {
+        jobId = jobClient.getAllJobs()[0].getJobID();
+        break;
+      } else {
+        UtilsForTests.waitFor(1000);
+      }
+    }  
+    return jobId;
+  }
+  
+  private TaskInfo getTaskInfo(JobID jobId, boolean isMap) 
+      throws IOException {
+    JTProtocol wovenClient = cluster.getJTClient().getProxy();
+    JobInfo jInfo = wovenClient.getJobInfo(jobId);
+    TaskInfo[] taskInfos = wovenClient.getTaskInfo(jobId);
+    for (TaskInfo taskinfo : taskInfos) {
+      if (!taskinfo.isSetupOrCleanup()) {
+        if (taskinfo.getTaskID().isMap() == isMap) {
+          return taskinfo;
+        }
+      }
+    }
+    return null;
+  }
+
+  private static void createInput(Path inDir, Configuration conf) 
+     throws IOException {
+    FileSystem fs = inDir.getFileSystem(conf);
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Failed to create the input directory:" 
+          + inDir.toString());
+    }
+    fs.setPermission(inDir, new FsPermission(FsAction.ALL, 
+    FsAction.ALL, FsAction.ALL));
+    DataOutputStream file = fs.create(new Path(inDir, "data.txt"));
+    String input="Process tree cleanup of Streaming job tasks.";
+    file.writeBytes(input + "\n");
+    file.close();
+  }
+
+  private static void cleanup(Path dir, Configuration conf) 
+      throws IOException {
+    FileSystem fs = dir.getFileSystem(conf);
+    fs.delete(dir, true);
+  }
+  
+  class RunStreamJob extends Thread {
+    Configuration jobConf;
+    Tool tool;
+    String [] args;
+    public RunStreamJob(Configuration jobConf, Tool tool, 
+        String [] args) {
+      this.jobConf = jobConf;
+      this.tool = tool;
+      this.args = args;
+    }
+    public void run() {
+      try {
+        ToolRunner.run(jobConf, tool, args);
+      } catch(InterruptedException iexp) {
+        LOG.warn("Thread is interrupted:" + iexp.getMessage());
+      } catch(Exception exp) {
+        LOG.warn("Exception:" + exp.getMessage());
+      }
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestTaskKillingOfStreamingJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestTaskKillingOfStreamingJob.java?rev=1077640&r1=1077639&r2=1077640&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestTaskKillingOfStreamingJob.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestTaskKillingOfStreamingJob.java Fri Mar  4 04:39:22 2011
@@ -217,6 +217,7 @@ public class TestTaskKillingOfStreamingJ
     int totalJobs = client.getAllJobs().length;
     String [] streamingArgs = generateArgs(runtimeArgs);
     cleanup(outputDir, conf);
+    conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
     final RunStreamingJob streamJobThread = new RunStreamingJob(conf,
         streamJob,streamingArgs);
     streamJobThread.start();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/HighRamJobHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/HighRamJobHelper.java?rev=1077640&r1=1077639&r2=1077640&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/HighRamJobHelper.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/HighRamJobHelper.java Fri Mar  4 04:39:22 2011
@@ -33,6 +33,7 @@ public class HighRamJobHelper {
     String jobArgs []= {"-D","mapred.cluster.max.map.memory.mb=2048", 
                         "-D","mapred.cluster.max.reduce.memory.mb=2048", 
                         "-D","mapred.cluster.map.memory.mb=1024", 
+                        "-D","mapreduce.job.complete.cancel.delegation.tokens=false",
                         "-D","mapred.cluster.reduce.memory.mb=1024",
                         "-m", "6", 
                         "-r", "2", 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCacheFileReferenceCount.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCacheFileReferenceCount.java?rev=1077640&r1=1077639&r2=1077640&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCacheFileReferenceCount.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCacheFileReferenceCount.java Fri Mar  4 04:39:22 2011
@@ -1,283 +1,283 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.test.system.JobInfo;
-import org.apache.hadoop.mapreduce.test.system.MRCluster;
-import org.apache.hadoop.mapreduce.test.system.JTProtocol;
-import org.apache.hadoop.mapreduce.test.system.JTClient;
-import org.apache.hadoop.mapreduce.test.system.TTClient;
-import org.apache.hadoop.mapreduce.test.system.TaskInfo;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.examples.SleepJob;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import java.net.URI;
-import java.io.DataOutputStream;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Assert;
-import org.junit.Test;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Verify the job cache files localizations.
- */
-public class TestCacheFileReferenceCount {
-  private static final Log LOG = LogFactory
-          .getLog(TestCacheFileReferenceCount.class);
-  private static Configuration conf = new Configuration();
-  private static MRCluster cluster;
-  private static Path tmpFolderPath = null;
-  private static String cacheFile1 = "cache1.txt";
-  private static String cacheFile2 = "cache2.txt";
-  private static String cacheFile3 = "cache3.txt";
-  private static String cacheFile4 = "cache4.txt";
-  private static JTProtocol wovenClient = null;
-  private static JobClient jobClient = null;
-  private static JTClient jtClient = null;
-  private static URI cacheFileURI1;
-  private static URI cacheFileURI2;
-  private static URI cacheFileURI3;
-  private static URI cacheFileURI4;
-  
-  @BeforeClass
-  public static void before() throws Exception {
-    cluster = MRCluster.createCluster(conf);
-    cluster.setUp();
-    tmpFolderPath = new Path("hdfs:///tmp");
-    jtClient = cluster.getJTClient();
-    jobClient = jtClient.getClient();
-    wovenClient = cluster.getJTClient().getProxy();
-    cacheFileURI1 = createCacheFile(tmpFolderPath, cacheFile1);
-    cacheFileURI2 = createCacheFile(tmpFolderPath, cacheFile2);
-  }
-  
-  @AfterClass
-  public static void after() throws Exception {
-    deleteCacheFile(new Path(tmpFolderPath, cacheFile1));
-    deleteCacheFile(new Path(tmpFolderPath, cacheFile2));
-    deleteCacheFile(new Path(tmpFolderPath, cacheFile4));
-    cluster.tearDown();
-  }
-  
-  /**
-   * Run the job with two distributed cache files and verify
-   * whether job is succeeded or not.
-   * @throws Exception
-   */
-  @Test
-  public void testCacheFilesLocalization() throws Exception {
-    conf = wovenClient.getDaemonConf();
-    SleepJob job = new SleepJob();
-    job.setConf(conf);
-    JobConf jobConf = job.setupJobConf(4, 1, 4000, 4000, 1000, 1000);
-    DistributedCache.createSymlink(jobConf);
-    DistributedCache.addCacheFile(cacheFileURI1, jobConf);
-    DistributedCache.addCacheFile(cacheFileURI2, jobConf);
-    RunningJob runJob = jobClient.submitJob(jobConf);
-    JobID jobId = runJob.getID();
-
-    Assert.assertTrue("Job has not been started for 1 min.", 
-        jtClient.isJobStarted(jobId));
-    TaskInfo[] taskInfos = wovenClient.getTaskInfo(jobId);
-    Assert.assertTrue("Cache File1 has not been localize",
-        checkLocalization(taskInfos,cacheFile1));
-    Assert.assertTrue("Cache File2 has not been localize",
-            checkLocalization(taskInfos,cacheFile2));
-    JobInfo jInfo = wovenClient.getJobInfo(jobId);
-    LOG.info("Waiting till the job is completed...");
-    while (!jInfo.getStatus().isJobComplete()) {
-      UtilsForTests.waitFor(100);
-      jInfo = wovenClient.getJobInfo(jobId);
-    }
-    Assert.assertEquals("Job has not been succeeded", 
-        jInfo.getStatus().getRunState(), JobStatus.SUCCEEDED);
-  }
-  
-  /**
-   * Run the job with distributed cache files and remove one cache
-   * file from the DFS when it is localized.verify whether the job
-   * is failed or not.
-   * @throws Exception
-   */
-  @Test
-  public void testDeleteCacheFileInDFSAfterLocalized() throws Exception {
-    conf = wovenClient.getDaemonConf();
-    SleepJob job = new SleepJob();
-    job.setConf(conf);
-    JobConf jobConf = job.setupJobConf(4, 1, 4000, 4000, 1000, 1000);
-    cacheFileURI3 = createCacheFile(tmpFolderPath, cacheFile3);
-    DistributedCache.createSymlink(jobConf);
-    DistributedCache.addCacheFile(cacheFileURI3, jobConf);
-    RunningJob runJob = jobClient.submitJob(jobConf);
-    JobID jobId = runJob.getID();
-    Assert.assertTrue("Job has not been started for 1 min.", 
-        jtClient.isJobStarted(jobId));
-    TaskInfo[] taskInfos = wovenClient.getTaskInfo(jobId);
-    boolean iscacheFileLocalized = checkLocalization(taskInfos,cacheFile3);
-    Assert.assertTrue("CacheFile has not been localized", 
-        iscacheFileLocalized);
-    deleteCacheFile(new Path(tmpFolderPath, cacheFile3));
-    JobInfo jInfo = wovenClient.getJobInfo(jobId);
-    LOG.info("Waiting till the job is completed...");
-    while (!jInfo.getStatus().isJobComplete()) {
-      UtilsForTests.waitFor(100);
-      jInfo = wovenClient.getJobInfo(jobId);
-    }
-    Assert.assertEquals("Job has not been failed", 
-        jInfo.getStatus().getRunState(), JobStatus.FAILED);
-  }
-  
-  /**
-   * Run the job with two distribute cache files and the size of
-   * one file should be larger than local.cache.size.Verify 
-   * whether job is succeeded or not.
-   * @throws Exception
-   */
-  @Test
-  public void testCacheSizeExceeds() throws Exception {
-    conf = wovenClient.getDaemonConf();
-    SleepJob job = new SleepJob();
-    String jobArgs []= {"-D","local.cache.size=1024", 
-                        "-m", "4", 
-                        "-r", "2", 
-                        "-mt", "2000", 
-                        "-rt", "2000",
-                        "-recordt","100"};
-    JobConf jobConf = new JobConf(conf);
-    cacheFileURI4 = createCacheFile(tmpFolderPath, cacheFile4);
-    DistributedCache.createSymlink(jobConf);
-    DistributedCache.addCacheFile(cacheFileURI4, jobConf);
-    int countBeforeJS = jtClient.getClient().getAllJobs().length;
-    JobID prvJobId = jtClient.getClient().getAllJobs()[0].getJobID();
-    int exitCode = ToolRunner.run(jobConf,job,jobArgs);
-    Assert.assertEquals("Exit Code:", 0, exitCode);
-    int countAfterJS = jtClient.getClient().getAllJobs().length;
-    int counter = 0;
-    while (counter++ < 30 ) {
-      if (countBeforeJS == countAfterJS) {
-        UtilsForTests.waitFor(1000);
-        countAfterJS = jtClient.getClient().getAllJobs().length;
-      } else {
-        break;
-      } 
-    }
-    JobID jobId = jtClient.getClient().getAllJobs()[0].getJobID();
-    counter = 0;
-    while (counter++ < 30) {
-      if (jobId.toString().equals(prvJobId.toString())) {
-        UtilsForTests.waitFor(1000); 
-        jobId = jtClient.getClient().getAllJobs()[0].getJobID();
-      } else { 
-        break;
-      }
-    }
-    JobInfo jInfo = wovenClient.getJobInfo(jobId);
-    Assert.assertEquals("Job has not been succeeded", 
-          jInfo.getStatus().getRunState(), JobStatus.SUCCEEDED);
-  }
-  
-  private boolean checkLocalization(TaskInfo[] taskInfos, String cacheFile) 
-      throws Exception {
-    boolean iscacheFileLocalized = false;
-    for (TaskInfo taskinfo : taskInfos) {
-      if (!taskinfo.isSetupOrCleanup()) {
-        String[] taskTrackers = taskinfo.getTaskTrackers();
-        List<TTClient> ttList = getTTClients(taskTrackers);
-        for (TTClient ttClient : ttList) {
-          iscacheFileLocalized = checkCacheFile(ttClient,cacheFile);
-          if(iscacheFileLocalized) {
-            return true;
-          }
-        } 
-      }
-    }
-    return false;
-  }
-  
-  private List<TTClient> getTTClients(String[] taskTrackers) 
-      throws Exception {
-    List<TTClient> ttClientList= new ArrayList<TTClient>();
-    for (String taskTracker: taskTrackers) {
-      taskTracker = UtilsForTests.getFQDNofTT(taskTracker);
-      TTClient ttClient = cluster.getTTClient(taskTracker);
-      if (ttClient != null) {
-        ttClientList.add(ttClient);
-      }
-    }
-    return ttClientList;
-  }
-  
-  private boolean checkCacheFile(TTClient ttClient, String cacheFile) 
-      throws IOException {
-    String[] localDirs = ttClient.getMapredLocalDirs();
-    for (String localDir : localDirs) {
-      localDir = localDir + Path.SEPARATOR + 
-          TaskTracker.getPublicDistributedCacheDir();
-      FileStatus[] fileStatuses = ttClient.listStatus(localDir, 
-          true, true);
-      for (FileStatus  fileStatus : fileStatuses) {
-        Path path = fileStatus.getPath();
-        if ((path.toString()).endsWith(cacheFile)) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-  
-  private static void deleteCacheFile(Path cacheFilePath) 
-      throws IOException {
-    FileSystem dfs = jobClient.getFs();
-    dfs.delete(cacheFilePath, true);
-  }
-
-  private static URI createCacheFile(Path tmpFolderPath, String cacheFile) 
-      throws IOException {
-    String input = "distribute cache content...";
-    FileSystem dfs = jobClient.getFs();
-    conf = wovenClient.getDaemonConf();
-    FileSystem fs = tmpFolderPath.getFileSystem(conf);
-    if (!fs.mkdirs(tmpFolderPath)) {
-        throw new IOException("Failed to create the temp directory:" 
-            + tmpFolderPath.toString());
-      }
-    deleteCacheFile(new Path(tmpFolderPath, cacheFile));
-    DataOutputStream file = fs.create(new Path(tmpFolderPath, cacheFile));
-    int i = 0;
-    while(i++ < 100) {
-      file.writeBytes(input);
-    }
-    file.close();
-    dfs.setPermission(tmpFolderPath, new FsPermission(FsAction.ALL, 
-        FsAction.ALL, FsAction.ALL));
-    URI uri = URI.create(new Path(tmpFolderPath, cacheFile).toString());
-    return uri;
-  }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JTClient;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import java.net.URI;
+import java.io.DataOutputStream;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Assert;
+import org.junit.Test;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Verify the job cache files localizations.
+ */
+public class TestCacheFileReferenceCount {
+  private static final Log LOG = LogFactory
+          .getLog(TestCacheFileReferenceCount.class);
+  private static Configuration conf = new Configuration();
+  private static MRCluster cluster;
+  private static Path tmpFolderPath = null;
+  private static String cacheFile1 = "cache1.txt";
+  private static String cacheFile2 = "cache2.txt";
+  private static String cacheFile3 = "cache3.txt";
+  private static String cacheFile4 = "cache4.txt";
+  private static JTProtocol wovenClient = null;
+  private static JobClient jobClient = null;
+  private static JTClient jtClient = null;
+  private static URI cacheFileURI1;
+  private static URI cacheFileURI2;
+  private static URI cacheFileURI3;
+  private static URI cacheFileURI4;
+  
+  @BeforeClass
+  public static void before() throws Exception {
+    cluster = MRCluster.createCluster(conf);
+    cluster.setUp();
+    tmpFolderPath = new Path("hdfs:///tmp");
+    jtClient = cluster.getJTClient();
+    jobClient = jtClient.getClient();
+    wovenClient = cluster.getJTClient().getProxy();
+    cacheFileURI1 = createCacheFile(tmpFolderPath, cacheFile1);
+    cacheFileURI2 = createCacheFile(tmpFolderPath, cacheFile2);
+  }
+  
+  @AfterClass
+  public static void after() throws Exception {
+    deleteCacheFile(new Path(tmpFolderPath, cacheFile1));
+    deleteCacheFile(new Path(tmpFolderPath, cacheFile2));
+    deleteCacheFile(new Path(tmpFolderPath, cacheFile4));
+    cluster.tearDown();
+  }
+  
+  /**
+   * Run the job with two distributed cache files and verify
+   * whether job is succeeded or not.
+   * @throws Exception
+   */
+  @Test
+  public void testCacheFilesLocalization() throws Exception {
+    conf = wovenClient.getDaemonConf();
+    SleepJob job = new SleepJob();
+    job.setConf(conf);
+    JobConf jobConf = job.setupJobConf(4, 1, 4000, 4000, 1000, 1000);
+    DistributedCache.createSymlink(jobConf);
+    DistributedCache.addCacheFile(cacheFileURI1, jobConf);
+    DistributedCache.addCacheFile(cacheFileURI2, jobConf);
+    RunningJob runJob = jobClient.submitJob(jobConf);
+    JobID jobId = runJob.getID();
+
+    Assert.assertTrue("Job has not been started for 1 min.", 
+        jtClient.isJobStarted(jobId));
+    TaskInfo[] taskInfos = wovenClient.getTaskInfo(jobId);
+    Assert.assertTrue("Cache File1 has not been localize",
+        checkLocalization(taskInfos,cacheFile1));
+    Assert.assertTrue("Cache File2 has not been localize",
+            checkLocalization(taskInfos,cacheFile2));
+    JobInfo jInfo = wovenClient.getJobInfo(jobId);
+    LOG.info("Waiting till the job is completed...");
+    while (!jInfo.getStatus().isJobComplete()) {
+      UtilsForTests.waitFor(100);
+      jInfo = wovenClient.getJobInfo(jobId);
+    }
+    Assert.assertEquals("Job has not been succeeded", 
+        jInfo.getStatus().getRunState(), JobStatus.SUCCEEDED);
+  }
+  
+  /**
+   * Run the job with distributed cache files and remove one cache
+   * file from the DFS when it is localized.verify whether the job
+   * is failed or not.
+   * @throws Exception
+   */
+  @Test
+  public void testDeleteCacheFileInDFSAfterLocalized() throws Exception {
+    conf = wovenClient.getDaemonConf();
+    SleepJob job = new SleepJob();
+    job.setConf(conf);
+    JobConf jobConf = job.setupJobConf(4, 1, 4000, 4000, 1000, 1000);
+    cacheFileURI3 = createCacheFile(tmpFolderPath, cacheFile3);
+    DistributedCache.createSymlink(jobConf);
+    DistributedCache.addCacheFile(cacheFileURI3, jobConf);
+    RunningJob runJob = jobClient.submitJob(jobConf);
+    JobID jobId = runJob.getID();
+    Assert.assertTrue("Job has not been started for 1 min.", 
+        jtClient.isJobStarted(jobId));
+    TaskInfo[] taskInfos = wovenClient.getTaskInfo(jobId);
+    boolean iscacheFileLocalized = checkLocalization(taskInfos,cacheFile3);
+    Assert.assertTrue("CacheFile has not been localized", 
+        iscacheFileLocalized);
+    deleteCacheFile(new Path(tmpFolderPath, cacheFile3));
+    JobInfo jInfo = wovenClient.getJobInfo(jobId);
+    LOG.info("Waiting till the job is completed...");
+    while (!jInfo.getStatus().isJobComplete()) {
+      UtilsForTests.waitFor(100);
+      jInfo = wovenClient.getJobInfo(jobId);
+    }
+    Assert.assertEquals("Job has not been failed", 
+        jInfo.getStatus().getRunState(), JobStatus.FAILED);
+  }
+  
+  /**
+   * Run the job with two distribute cache files and the size of
+   * one file should be larger than local.cache.size.Verify 
+   * whether job is succeeded or not.
+   * @throws Exception
+   */
+  @Test
+  public void testCacheSizeExceeds() throws Exception {
+    conf = wovenClient.getDaemonConf();
+    SleepJob job = new SleepJob();
+    String jobArgs []= {"-D","local.cache.size=1024", 
+                        "-m", "4", 
+                        "-r", "2", 
+                        "-mt", "2000", 
+                        "-rt", "2000",
+                        "-recordt","100"};
+    JobConf jobConf = new JobConf(conf);
+    cacheFileURI4 = createCacheFile(tmpFolderPath, cacheFile4);
+    DistributedCache.createSymlink(jobConf);
+    DistributedCache.addCacheFile(cacheFileURI4, jobConf);
+    int countBeforeJS = jtClient.getClient().getAllJobs().length;
+    JobID prvJobId = jtClient.getClient().getAllJobs()[0].getJobID();
+    int exitCode = ToolRunner.run(jobConf,job,jobArgs);
+    Assert.assertEquals("Exit Code:", 0, exitCode);
+    int countAfterJS = jtClient.getClient().getAllJobs().length;
+    int counter = 0;
+    while (counter++ < 30 ) {
+      if (countBeforeJS == countAfterJS) {
+        UtilsForTests.waitFor(1000);
+        countAfterJS = jtClient.getClient().getAllJobs().length;
+      } else {
+        break;
+      } 
+    }
+    JobID jobId = jtClient.getClient().getAllJobs()[0].getJobID();
+    counter = 0;
+    while (counter++ < 30) {
+      if (jobId.toString().equals(prvJobId.toString())) {
+        UtilsForTests.waitFor(1000); 
+        jobId = jtClient.getClient().getAllJobs()[0].getJobID();
+      } else { 
+        break;
+      }
+    }
+    JobInfo jInfo = wovenClient.getJobInfo(jobId);
+    Assert.assertEquals("Job has not been succeeded", 
+          jInfo.getStatus().getRunState(), JobStatus.SUCCEEDED);
+  }
+  
+  private boolean checkLocalization(TaskInfo[] taskInfos, String cacheFile) 
+      throws Exception {
+    boolean iscacheFileLocalized = false;
+    for (TaskInfo taskinfo : taskInfos) {
+      if (!taskinfo.isSetupOrCleanup()) {
+        String[] taskTrackers = taskinfo.getTaskTrackers();
+        List<TTClient> ttList = getTTClients(taskTrackers);
+        for (TTClient ttClient : ttList) {
+          iscacheFileLocalized = checkCacheFile(ttClient,cacheFile);
+          if(iscacheFileLocalized) {
+            return true;
+          }
+        } 
+      }
+    }
+    return false;
+  }
+  
+  private List<TTClient> getTTClients(String[] taskTrackers) 
+      throws Exception {
+    List<TTClient> ttClientList= new ArrayList<TTClient>();
+    for (String taskTracker: taskTrackers) {
+      taskTracker = UtilsForTests.getFQDNofTT(taskTracker);
+      TTClient ttClient = cluster.getTTClient(taskTracker);
+      if (ttClient != null) {
+        ttClientList.add(ttClient);
+      }
+    }
+    return ttClientList;
+  }
+  
+  private boolean checkCacheFile(TTClient ttClient, String cacheFile) 
+      throws IOException {
+    String[] localDirs = ttClient.getMapredLocalDirs();
+    for (String localDir : localDirs) {
+      localDir = localDir + Path.SEPARATOR + 
+          TaskTracker.getPublicDistributedCacheDir();
+      FileStatus[] fileStatuses = ttClient.listStatus(localDir, 
+          true, true);
+      for (FileStatus  fileStatus : fileStatuses) {
+        Path path = fileStatus.getPath();
+        if ((path.toString()).endsWith(cacheFile)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+  
+  private static void deleteCacheFile(Path cacheFilePath) 
+      throws IOException {
+    FileSystem dfs = jobClient.getFs();
+    dfs.delete(cacheFilePath, true);
+  }
+
+  private static URI createCacheFile(Path tmpFolderPath, String cacheFile) 
+      throws IOException {
+    String input = "distribute cache content...";
+    FileSystem dfs = jobClient.getFs();
+    conf = wovenClient.getDaemonConf();
+    FileSystem fs = tmpFolderPath.getFileSystem(conf);
+    if (!fs.mkdirs(tmpFolderPath)) {
+        throw new IOException("Failed to create the temp directory:" 
+            + tmpFolderPath.toString());
+      }
+    deleteCacheFile(new Path(tmpFolderPath, cacheFile));
+    DataOutputStream file = fs.create(new Path(tmpFolderPath, cacheFile));
+    int i = 0;
+    while(i++ < 100) {
+      file.writeBytes(input);
+    }
+    file.close();
+    dfs.setPermission(new Path(tmpFolderPath, cacheFile), new FsPermission(FsAction.ALL, 
+        FsAction.ALL, FsAction.ALL));
+    URI uri = URI.create(new Path(tmpFolderPath, cacheFile).toString());
+    return uri;
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java?rev=1077640&r1=1077639&r2=1077640&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java Fri Mar  4 04:39:22 2011
@@ -250,7 +250,13 @@ public class TestJobCacheDirectoriesClea
       String localTaskDir = localDir + "/" + 
           TaskTracker.getLocalTaskDir(getUser(), jobId.toString(), 
           taskAttID.toString() + "/work/");
-      if (ttClient.getFileStatus(localTaskDir,true).isDir()) {
+      boolean fstatus = false;
+      try {
+        fstatus = ttClient.getFileStatus(localTaskDir,true).isDir(); 
+      } catch(Exception exp) {
+        fstatus = false;
+      }
+      if (fstatus) {
          ttClient.createFile(localTaskDir, customFile, permission, true);
          ttClient.createFolder(localTaskDir, customFolder, permission, true);
          list.add(localTaskDir + customFile);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobSummary.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobSummary.java?rev=1077640&r1=1077639&r2=1077640&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobSummary.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobSummary.java Fri Mar  4 04:39:22 2011
@@ -89,6 +89,7 @@ public class TestJobSummary {
   public void testJobSummaryInfoOfKilledJob() throws IOException, 
           InterruptedException {
     SleepJob job = new SleepJob();
+    conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
     job.setConf(conf);
     conf = job.setupJobConf(2, 1, 4000, 4000, 100, 100);
     JobConf jobConf = new JobConf(conf);
@@ -113,6 +114,7 @@ public class TestJobSummary {
   public void testJobSummaryInfoOfFailedJob() throws IOException, 
           InterruptedException {
     conf = remoteJTClient.getDaemonConf();
+    conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
     JobConf jobConf = new JobConf(conf);
     jobConf.setJobName("Fail Job");
     jobConf.setJarByClass(GenerateTaskChildProcess.class);
@@ -147,6 +149,7 @@ public class TestJobSummary {
     SleepJob job = new SleepJob();
     job.setConf(conf);
     conf = job.setupJobConf(2, 1, 4000, 4000, 100, 100);
+    conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
     JobConf jobConf = new JobConf(conf);
     JobQueueInfo [] queues = jobClient.getQueues();
     for (JobQueueInfo queueInfo : queues ){