You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by co...@apache.org on 2010/06/21 21:02:51 UTC

svn commit: r956666 [4/4] - in /hadoop/mapreduce/trunk: ./ ivy/ src/test/aop/build/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/testjar/ src/test/system/ src/test/system/aop/ src/test/system/aop/org/ src/test/system/aop/org/apache/ src/te...

Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestTaskKilling.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestTaskKilling.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestTaskKilling.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestTaskKilling.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,640 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.mapred.JobClient.NetworkedJob;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+/**
+ * A System test for verifying the status after killing the tasks at different
+ * conditions.
+ */
+public class TestTaskKilling {
+  private static final Log LOG = LogFactory.getLog(TestTaskKilling.class);
+  private static MRCluster cluster;
+  private static JobClient jobClient = null;
+  private static JTProtocol remoteJTClient = null;
+
+  public TestTaskKilling() {
+  }
+
+  @BeforeClass
+  public static void before() throws Exception {
+    Configuration conf = new Configuration();
+    cluster = MRCluster.createCluster(conf);
+    cluster.setUp();
+    jobClient = cluster.getJTClient().getClient();
+    remoteJTClient = cluster.getJTClient().getProxy();
+  }
+
+  @AfterClass
+  public static void after() throws Exception {
+    cluster.tearDown();
+  }
+
+  /**
+   * Verifying the running job status whether it succeeds or not after failing
+   * some of its tasks.
+   * 
+   * @throws ClassNotFoundException
+   */
+  @Test
+  public void testFailedTaskJobStatus()
+      throws IOException, InterruptedException, ClassNotFoundException {
+    Configuration conf = new Configuration(cluster.getConf());
+    TaskInfo taskInfo = null;
+    SleepJob job = new SleepJob();
+    job.setConf(conf);
+    Job slpJob = job.createJob(3, 1, 4000, 4000, 100, 100);
+    JobConf jobConf = new JobConf(conf);
+    jobConf.setMaxMapAttempts(20);
+    jobConf.setMaxReduceAttempts(20);
+    slpJob.submit();
+    RunningJob runJob =
+        jobClient.getJob(org.apache.hadoop.mapred.JobID.downgrade(slpJob
+            .getJobID()));
+    JobID id = runJob.getID();
+    JobInfo jInfo = remoteJTClient.getJobInfo(id);
+    int counter = 0;
+    while (counter < 60) {
+      if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) {
+        break;
+      } else {
+        UtilsForTests.waitFor(1000);
+        jInfo = remoteJTClient.getJobInfo(id);
+      }
+      counter++;
+    }
+    Assert.assertTrue("Job has not been started for 1 min.", counter != 60);
+
+    TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(id);
+    for (TaskInfo taskinfo : taskInfos) {
+      if (!taskinfo.isSetupOrCleanup()) {
+        taskInfo = taskinfo;
+      }
+    }
+
+    counter = 0;
+    taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+    while (counter < 60) {
+      if (taskInfo.getTaskStatus().length > 0) {
+        if (taskInfo.getTaskStatus()[0].getRunState() == TaskStatus.State.RUNNING) {
+          break;
+        }
+      }
+      UtilsForTests.waitFor(1000);
+      taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+      counter++;
+    }
+    Assert.assertTrue("Task has not been started for 1 min.", counter != 60);
+
+    NetworkedJob networkJob = jobClient.new NetworkedJob(jInfo.getStatus());
+    TaskID tID = TaskID.downgrade(taskInfo.getTaskID());
+    TaskAttemptID taskAttID = new TaskAttemptID(tID, 0);
+    networkJob.killTask(taskAttID, false);
+
+    LOG.info("Waiting till the job is completed...");
+    while (!jInfo.getStatus().isJobComplete()) {
+      UtilsForTests.waitFor(100);
+      jInfo = remoteJTClient.getJobInfo(id);
+    }
+
+    Assert.assertEquals(
+        "JobStatus", jInfo.getStatus().getRunState(), JobStatus.SUCCEEDED);
+  }
+
+  /**
+   * Verifying whether task temporary output directory is cleaned up or not
+   * after killing the task.
+   */
+  @Test
+  public void testDirCleanupAfterTaskKilled()
+      throws IOException, InterruptedException {
+    TaskInfo taskInfo = null;
+    boolean isTempFolderExists = false;
+    String localTaskDir = null;
+    TTClient ttClient = null;
+    TaskID tID = null;
+    FileStatus filesStatus[] = null;
+    Path inputDir = new Path("input");
+    Path outputDir = new Path("output");
+    Configuration conf = new Configuration(cluster.getConf());
+    JobConf jconf = new JobConf(conf);
+    jconf.setJobName("Word Count");
+    jconf.setJarByClass(WordCount.class);
+    jconf.setMapperClass(WordCount.MapClass.class);
+    jconf.setCombinerClass(WordCount.Reduce.class);
+    jconf.setReducerClass(WordCount.Reduce.class);
+    jconf.setNumMapTasks(1);
+    jconf.setNumReduceTasks(1);
+    jconf.setMaxMapAttempts(20);
+    jconf.setMaxReduceAttempts(20);
+    jconf.setOutputKeyClass(Text.class);
+    jconf.setOutputValueClass(IntWritable.class);
+
+    cleanup(inputDir, conf);
+    cleanup(outputDir, conf);
+    createInput(inputDir, conf);
+    FileInputFormat.setInputPaths(jconf, inputDir);
+    FileOutputFormat.setOutputPath(jconf, outputDir);
+    RunningJob runJob = jobClient.submitJob(jconf);
+    JobID id = runJob.getID();
+    JobInfo jInfo = remoteJTClient.getJobInfo(id);
+    int counter = 0;
+    while (counter < 60) {
+      if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) {
+        break;
+      } else {
+        UtilsForTests.waitFor(1000);
+        jInfo = remoteJTClient.getJobInfo(id);
+      }
+      counter++;
+    }
+    Assert.assertTrue("Job has not been started for 1 min.", counter != 60);
+
+    JobStatus[] jobStatus = jobClient.getAllJobs();
+    String userName = jobStatus[0].getUsername();
+    TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(id);
+    for (TaskInfo taskinfo : taskInfos) {
+      if (!taskinfo.isSetupOrCleanup()) {
+        taskInfo = taskinfo;
+        break;
+      }
+    }
+
+    counter = 0;
+    while (counter < 30) {
+      if (taskInfo.getTaskStatus().length > 0) {
+        if (taskInfo.getTaskStatus()[0].getRunState() == TaskStatus.State.RUNNING) {
+          break;
+        }
+      }
+      UtilsForTests.waitFor(1000);
+      taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+      counter++;
+    }
+    Assert.assertTrue("Task has not been started for 30 sec.", counter != 30);
+
+    tID = TaskID.downgrade(taskInfo.getTaskID());
+    FinishTaskControlAction action = new FinishTaskControlAction(tID);
+
+    String[] taskTrackers = taskInfo.getTaskTrackers();
+    counter = 0;
+    while (counter < 30) {
+      if (taskTrackers.length != 0) {
+        break;
+      }
+      UtilsForTests.waitFor(100);
+      taskTrackers = taskInfo.getTaskTrackers();
+      counter++;
+    }
+
+    String hostName = taskTrackers[0].split("_")[1];
+    hostName = hostName.split(":")[0];
+    ttClient = cluster.getTTClient(hostName);
+    ttClient.getProxy().sendAction(action);
+    String localDirs[] = ttClient.getMapredLocalDirs();
+    TaskAttemptID taskAttID = new TaskAttemptID(tID, 0);
+    for (String localDir : localDirs) {
+      localTaskDir =
+          localDir
+              + "/"
+              + TaskTracker.getLocalTaskDir(userName, id.toString(), taskAttID
+                  .toString());
+      filesStatus = ttClient.listStatus(localTaskDir, true);
+      if (filesStatus.length > 0) {
+        isTempFolderExists = true;
+        NetworkedJob networkJob = jobClient.new NetworkedJob(jInfo.getStatus());
+        networkJob.killTask(taskAttID, false);
+        break;
+      }
+    }
+
+    Assert.assertTrue(
+        "Task Attempt directory "
+            + taskAttID + " has not been found while task was running.",
+        isTempFolderExists);
+    taskInfo = remoteJTClient.getTaskInfo(tID);
+
+    counter = 0;
+    while (counter < 60) {
+      UtilsForTests.waitFor(1000);
+      taskInfo = remoteJTClient.getTaskInfo(tID);
+      filesStatus = ttClient.listStatus(localTaskDir, true);
+      if (filesStatus.length == 0) {
+        break;
+      }
+      counter++;
+    }
+
+    Assert.assertTrue(
+        "Task attempt temporary folder has not been cleaned.",
+        isTempFolderExists && filesStatus.length == 0);
+    counter = 0;
+    while (counter < 30) {
+      UtilsForTests.waitFor(1000);
+      taskInfo = remoteJTClient.getTaskInfo(tID);
+      counter++;
+    }
+    taskInfo = remoteJTClient.getTaskInfo(tID);
+    Assert.assertEquals(
+        "Task status has not been changed to KILLED.", TaskStatus.State.KILLED,
+        taskInfo.getTaskStatus()[0].getRunState());
+  }
+
+  private void cleanup(Path dir, Configuration conf) throws IOException {
+    FileSystem fs = dir.getFileSystem(conf);
+    fs.delete(dir, true);
+  }
+
+  private void createInput(Path inDir, Configuration conf) throws IOException {
+    String input =
+        "Hadoop is framework for data intensive distributed "
+            + "applications.\n"
+            + "Hadoop enables applications to work with thousands of nodes.";
+    FileSystem fs = inDir.getFileSystem(conf);
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Failed to create the input directory:"
+          + inDir.toString());
+    }
+    fs.setPermission(inDir, new FsPermission(
+        FsAction.ALL, FsAction.ALL, FsAction.ALL));
+    DataOutputStream file = fs.create(new Path(inDir, "data.txt"));
+    int i = 0;
+    while (i < 1000 * 3000) {
+      file.writeBytes(input);
+      i++;
+    }
+    file.close();
+  }
+
+  /**
+   * Verifying whether task temporary output directory is cleaned up or not
+   * after failing the task.
+   */
+  @Test
+  public void testDirCleanupAfterTaskFailed()
+      throws IOException, InterruptedException {
+    TTClient ttClient = null;
+    FileStatus filesStatus[] = null;
+    String localTaskDir = null;
+    TaskInfo taskInfo = null;
+    TaskID tID = null;
+    boolean isTempFolderExists = false;
+    Path inputDir = new Path("input");
+    Path outputDir = new Path("output");
+    Configuration conf = new Configuration(cluster.getConf());
+    JobConf jconf = new JobConf(conf);
+    jconf.setJobName("Task Failed job");
+    jconf.setJarByClass(UtilsForTests.class);
+    jconf.setMapperClass(FailedMapperClass.class);
+    jconf.setNumMapTasks(1);
+    jconf.setNumReduceTasks(0);
+    jconf.setMaxMapAttempts(1);
+    cleanup(inputDir, conf);
+    cleanup(outputDir, conf);
+    createInput(inputDir, conf);
+    FileInputFormat.setInputPaths(jconf, inputDir);
+    FileOutputFormat.setOutputPath(jconf, outputDir);
+    RunningJob runJob = jobClient.submitJob(jconf);
+    JobID id = runJob.getID();
+    JobInfo jInfo = remoteJTClient.getJobInfo(id);
+
+    int counter = 0;
+    while (counter < 60) {
+      if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) {
+        break;
+      } else {
+        UtilsForTests.waitFor(1000);
+        jInfo = remoteJTClient.getJobInfo(id);
+      }
+      counter++;
+    }
+    Assert.assertTrue("Job has not been started for 1 min.", counter != 60);
+
+    JobStatus[] jobStatus = jobClient.getAllJobs();
+    String userName = jobStatus[0].getUsername();
+    TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(id);
+    for (TaskInfo taskinfo : taskInfos) {
+      if (!taskinfo.isSetupOrCleanup()) {
+        taskInfo = taskinfo;
+        break;
+      }
+    }
+
+    tID = TaskID.downgrade(taskInfo.getTaskID());
+    FinishTaskControlAction action = new FinishTaskControlAction(tID);
+    String[] taskTrackers = taskInfo.getTaskTrackers();
+    counter = 0;
+    while (counter < 30) {
+      if (taskTrackers.length != 0) {
+        break;
+      }
+      UtilsForTests.waitFor(1000);
+      taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+      taskTrackers = taskInfo.getTaskTrackers();
+      counter++;
+    }
+    Assert.assertTrue("Task tracker not found.", taskTrackers.length != 0);
+    String hostName = taskTrackers[0].split("_")[1];
+    hostName = hostName.split(":")[0];
+    ttClient = cluster.getTTClient(hostName);
+    ttClient.getProxy().sendAction(action);
+
+    counter = 0;
+    while (counter < 60) {
+      if (taskInfo.getTaskStatus().length > 0) {
+        if (taskInfo.getTaskStatus()[0].getRunState() == TaskStatus.State.RUNNING) {
+          break;
+        }
+      }
+      UtilsForTests.waitFor(1000);
+      taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+      counter++;
+    }
+    Assert.assertTrue("Task has not been started for 1 min.", counter != 60);
+
+    String localDirs[] = ttClient.getMapredLocalDirs();
+    TaskAttemptID taskAttID = new TaskAttemptID(tID, 0);
+    for (String localDir : localDirs) {
+      localTaskDir =
+          localDir
+              + "/"
+              + TaskTracker.getLocalTaskDir(userName, id.toString(), taskAttID
+                  .toString());
+      filesStatus = ttClient.listStatus(localTaskDir, true);
+      if (filesStatus.length > 0) {
+        isTempFolderExists = true;
+        break;
+      }
+    }
+
+    taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+    Assert.assertTrue(
+        "Task Attempt directory "
+            + taskAttID + " has not been found while task was running.",
+        isTempFolderExists);
+    counter = 0;
+    while (counter < 30) {
+      UtilsForTests.waitFor(1000);
+      taskInfo = remoteJTClient.getTaskInfo(tID);
+      counter++;
+    }
+
+    Assert.assertEquals("Task status has not been changed to FAILED.", taskInfo
+        .getTaskStatus()[0].getRunState(), TaskStatus.State.FAILED);
+
+    filesStatus = ttClient.listStatus(localTaskDir, true);
+    Assert.assertTrue(
+        "Temporary folder has not been cleanup.", filesStatus.length == 0);
+  }
+
+  public static class FailedMapperClass
+      implements Mapper<NullWritable, NullWritable, NullWritable, NullWritable> {
+    public void configure(JobConf job) {
+    }
+
+    public void map(
+        NullWritable key, NullWritable value,
+        OutputCollector<NullWritable, NullWritable> output, Reporter reporter)
+        throws IOException {
+      int counter = 0;
+      while (counter < 240) {
+        UtilsForTests.waitFor(1000);
+        counter++;
+      }
+      if (counter == 240) {
+        throw new IOException();
+      }
+    }
+
+    public void close() {
+    }
+  }
+
+  @Test
+  /**
+   * This tests verification of job killing by killing of all task 
+   * attempts of a particular task
+   * @param none
+   * @return void
+   */
+  public void testAllTaskAttemptKill() throws Exception {
+    Configuration conf = new Configuration(cluster.getConf());
+
+    JobStatus[] jobStatus = null;
+
+    SleepJob job = new SleepJob();
+    job.setConf(conf);
+    Job slpJob = job.createJob(3, 1, 40000, 1000, 100, 100);
+    JobConf jconf = new JobConf(conf);
+
+    // Submitting the job
+    slpJob.submit();
+    RunningJob rJob =
+        cluster.getJTClient().getClient().getJob(
+            org.apache.hadoop.mapred.JobID.downgrade(slpJob.getJobID()));
+
+    int MAX_MAP_TASK_ATTEMPTS =
+        Integer.parseInt(jconf.get(MRJobConfig.MAP_MAX_ATTEMPTS));
+
+    LOG.info("MAX_MAP_TASK_ATTEMPTS is : " + MAX_MAP_TASK_ATTEMPTS);
+
+    Assert.assertTrue(MAX_MAP_TASK_ATTEMPTS > 0);
+
+    TTClient tClient = null;
+    TTClient[] ttClients = null;
+
+    JobInfo jInfo = remoteJTClient.getJobInfo(rJob.getID());
+
+    // Assert if jobInfo is null
+    Assert.assertNotNull(jInfo.getStatus().getRunState());
+
+    // Wait for the job to start running.
+    while (jInfo.getStatus().getRunState() != JobStatus.RUNNING) {
+      try {
+        Thread.sleep(10000);
+      } catch (InterruptedException e) {
+      }
+      ;
+      jInfo = remoteJTClient.getJobInfo(rJob.getID());
+    }
+
+    // Temporarily store the jobid to use it later for comparision.
+    JobID jobidStore = rJob.getID();
+    jobidStore = JobID.downgrade(jobidStore);
+    LOG.info("job id is :" + jobidStore.toString());
+
+    TaskInfo[] taskInfos = null;
+
+    // After making sure that the job is running,
+    // the test execution has to make sure that
+    // at least one task has started running before continuing.
+    boolean runningCount = false;
+    int count = 0;
+    do {
+      taskInfos = cluster.getJTClient().getProxy().getTaskInfo(rJob.getID());
+      runningCount = false;
+      for (TaskInfo taskInfo : taskInfos) {
+        TaskStatus[] taskStatuses = taskInfo.getTaskStatus();
+        if (taskStatuses.length > 0) {
+          LOG.info("taskStatuses[0].getRunState() is :"
+              + taskStatuses[0].getRunState());
+          if (taskStatuses[0].getRunState() == TaskStatus.State.RUNNING) {
+            runningCount = true;
+            break;
+          } else {
+            LOG.info("Sleeping 5 seconds");
+            Thread.sleep(5000);
+          }
+        }
+      }
+      count++;
+      // If the count goes beyond a point, then break; This is to avoid
+      // infinite loop under unforeseen circumstances. Testcase will anyway
+      // fail later.
+      if (count > 10) {
+        Assert.fail("Since the sleep count has reached beyond a point"
+            + "failing at this point");
+      }
+    } while (!runningCount);
+
+    // This whole module is about getting the task Attempt id
+    // of one task and killing it MAX_MAP_TASK_ATTEMPTS times,
+    // whenever it re-attempts to run.
+    String taskIdKilled = null;
+    for (int i = 0; i < MAX_MAP_TASK_ATTEMPTS; i++) {
+      taskInfos = cluster.getJTClient().getProxy().getTaskInfo(rJob.getID());
+
+      for (TaskInfo taskInfo : taskInfos) {
+        TaskAttemptID taskAttemptID;
+        if (!taskInfo.isSetupOrCleanup()) {
+          // This is the task which is going to be killed continously in
+          // all its task attempts.The first task is getting picked up.
+          TaskID taskid = TaskID.downgrade(taskInfo.getTaskID());
+          LOG.info("taskid is :" + taskid);
+          if (i == 0) {
+            taskIdKilled = taskid.toString();
+            taskAttemptID = new TaskAttemptID(taskid, i);
+            LOG.info("taskAttemptid going to be killed is : " + taskAttemptID);
+            (jobClient.new NetworkedJob(jInfo.getStatus())).killTask(
+                taskAttemptID, true);
+            checkTaskCompletionEvent(taskAttemptID, jInfo);
+            break;
+          } else {
+            if (taskIdKilled.equals(taskid.toString())) {
+              taskAttemptID = new TaskAttemptID(taskid, i);
+              LOG
+                  .info("taskAttemptid going to be killed is : "
+                      + taskAttemptID);
+              (jobClient.new NetworkedJob(jInfo.getStatus())).killTask(
+                  taskAttemptID, true);
+              checkTaskCompletionEvent(taskAttemptID, jInfo);
+              break;
+            }
+          }
+        }
+      }
+    }
+    // Making sure that the job is complete.
+    while (jInfo != null && !jInfo.getStatus().isJobComplete()) {
+      Thread.sleep(10000);
+      jInfo = remoteJTClient.getJobInfo(rJob.getID());
+    }
+
+    // Making sure that the correct jobstatus is got from all the jobs
+    jobStatus = jobClient.getAllJobs();
+    JobStatus jobStatusFound = null;
+    for (JobStatus jobStatusTmp : jobStatus) {
+      if (JobID.downgrade(jobStatusTmp.getJobID()).equals(jobidStore)) {
+        jobStatusFound = jobStatusTmp;
+        LOG.info("jobStatus found is :" + jobStatusFound.getJobId().toString());
+      }
+    }
+
+    // Making sure that the job has FAILED
+    Assert.assertEquals(
+        "The job should have failed at this stage", JobStatus.FAILED,
+        jobStatusFound.getRunState());
+  }
+
+  // This method checks if task Attemptid occurs in the list
+  // of tasks that are completed (killed) for a job.This is
+  // required because after issuing a kill comamnd, the task
+  // has to be killed and appear in the taskCompletion event.
+  // After this a new task attempt will start running in a
+  // matter of few seconds.
+  public void checkTaskCompletionEvent(
+      TaskAttemptID taskAttemptID, JobInfo jInfo) throws Exception {
+    boolean match = false;
+    int count = 0;
+    while (!match) {
+      TaskCompletionEvent[] taskCompletionEvents =
+          jobClient.new NetworkedJob(jInfo.getStatus())
+              .getTaskCompletionEvents(0);
+      for (TaskCompletionEvent taskCompletionEvent : taskCompletionEvents) {
+        if ((taskCompletionEvent.getTaskAttemptId().toString())
+            .equals(taskAttemptID.toString())) {
+          match = true;
+          // Sleeping for 10 seconds giving time for the next task
+          // attempt to run
+          Thread.sleep(10000);
+          break;
+        }
+      }
+      if (!match) {
+        LOG.info("Thread is sleeping for 10 seconds");
+        Thread.sleep(10000);
+        count++;
+      }
+      // If the count goes beyond a point, then break; This is to avoid
+      // infinite loop under unforeseen circumstances.Testcase will anyway
+      // fail later.
+      if (count > 10) {
+        Assert.fail("Since the task attemptid is not appearing in the"
+            + "TaskCompletionEvent, it seems this task attempt was not killed");
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestTaskOwner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestTaskOwner.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestTaskOwner.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestTaskOwner.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import testjar.UserNamePermission;
+
+public class TestTaskOwner {
+  private static final Log LOG = LogFactory.getLog(TestTaskOwner.class);
+  private static Path outDir = new Path("output");
+  private static Path inDir = new Path("input");
+  public static MRCluster cluster;
+
+  // The role of this job is to write the user name to the output file
+  // which will be parsed
+
+  @BeforeClass
+  public static void setUp() throws java.lang.Exception {
+
+    cluster = MRCluster.createCluster(new Configuration());
+    cluster.setUp();
+    FileSystem fs = inDir.getFileSystem(cluster.getJTClient().getConf());
+    // Make sure that all is clean in case last tearDown wasn't successful
+    fs.delete(outDir, true);
+    fs.delete(inDir, true);
+
+    fs.create(inDir, true);
+  }
+
+  @Test
+  public void testProcessPermission() throws Exception {
+  // The user will submit a job which a plain old map reduce job
+  // this job will output the username of the task that is running
+  // in the cluster and we will authenticate whether matches
+  // with the job that is submitted by the same user.
+
+    Configuration conf = cluster.getJTClient().getConf();
+    Job job = new Job(conf, "user name check");
+
+    job.setJarByClass(UserNamePermission.class);
+    job.setMapperClass(UserNamePermission.UserNameMapper.class);
+    job.setCombinerClass(UserNamePermission.UserNameReducer.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+
+    job.setReducerClass(UserNamePermission.UserNameReducer.class);
+    job.setNumReduceTasks(1);
+
+    FileInputFormat.addInputPath(job, inDir);
+    FileOutputFormat.setOutputPath(job, outDir);
+
+    job.waitForCompletion(true);
+
+    // now verify the user name that is written by the task tracker is same
+    // as the
+    // user name that was used to launch the task in the first place
+    FileSystem fs = outDir.getFileSystem(conf);
+
+    Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
+     new Utils.OutputFileUtils.OutputFilesFilter()));
+
+    for (int i = 0; i < fileList.length; ++i) {
+	  LOG.info("File list[" + i + "]" + ": " + fileList[i]);
+	  BufferedReader file = new BufferedReader(new InputStreamReader(fs
+      .open(fileList[i])));
+       String line = file.readLine();
+       while (line != null) {
+         StringTokenizer token = new StringTokenizer(line);
+         if (token.hasMoreTokens()) {
+           LOG.info("First token " + token.nextToken());
+           String userName = token.nextToken();
+
+           LOG.info("Next token " + userName);
+           Assert
+             .assertEquals(
+              "The user name did not match permission violation ",
+               userName, System.getProperty("user.name")
+              .toString());
+           break;
+         }
+        }
+        file.close();
+     }
+  }
+
+  @AfterClass
+  public static void tearDown() throws java.lang.Exception {
+    FileSystem fs = outDir.getFileSystem(cluster.getJTClient().getConf());
+    fs.delete(outDir, true);
+    fs.delete(inDir, true);
+    cluster.tearDown();
+   }
+}
+
+